- Timestamp:
- Jun 19, 2013, 11:05:49 AM (11 years ago)
- Location:
- source
- Files:
-
- 18 added
- 11 deleted
- 73 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/CMakeLists.txt
r11885 r12060 46 46 ### Increment this whenever the interface changes! ### 47 47 ###################################################### 48 set(ariba_SOVERSION 1)48 set(ariba_SOVERSION 2) 49 49 ###################################################### 50 50 … … 184 184 add_headers( 185 185 ariba.h 186 AribaModule.h186 # AribaModule.h 187 187 CommunicationListener.h 188 188 DataMessage.h … … 199 199 200 200 add_sources( 201 AribaModule.cpp201 # AribaModule.cpp 202 202 CommunicationListener.cpp 203 203 DataMessage.cpp -
source/ariba/CommunicationListener.cpp
r7468 r12060 59 59 60 60 void CommunicationListener::onLinkFail(const LinkID& l, const NodeID& r) { 61 onLinkDown(l, r); 61 62 } 62 63 … … 69 70 } 70 71 72 // this implementation provides backward compatibility 73 // overwrite it to use the new interface 74 void CommunicationListener::onMessage(reboost::shared_buffer_t message, 75 const NodeID& remote, 76 const LinkID& lnk, 77 const SequenceNumber& seqnum, 78 const ariba::overlay::OverlayMsg* overlay_msg) 79 { 80 // copy data 81 Data data; 82 data.setLength(message.size() * 8); 83 memcpy(data.getBuffer(), message.data(), message.size()); 84 85 // and prepare old-style overlay message 86 Message legacy_msg; 87 legacy_msg.setPayload(data); 88 89 90 // * call legacy handler * 91 this->onMessage(legacy_msg, remote, lnk); 92 } 93 71 94 void CommunicationListener::onKeyValue( const Data& key, const vector<Data>& value ) { 72 95 } 73 96 97 bool CommunicationListener::onPing(const NodeID& remote) 98 { 99 return true; 100 } 101 102 void CommunicationListener::onPingLost(const NodeID& remote) 103 { 104 } 105 106 void CommunicationListener::onPong(const NodeID& remote) 107 { 108 } 109 74 110 } // namespace ariba -
source/ariba/CommunicationListener.h
r9684 r12060 44 44 #include "LinkProperties.h" 45 45 #include "DataMessage.h" 46 #include "ariba/overlay/SequenceNumber.h" 46 47 47 48 namespace ariba { 49 50 typedef ariba::overlay::SequenceNumber SequenceNumber; 48 51 49 52 // forward decl 50 53 namespace overlay { 51 54 class BaseOverlay; 55 class OverlayMsg; 52 56 } 53 57 … … 116 120 117 121 /** 122 * @DEPRECATED 123 * 118 124 * Called when a message is incoming 119 125 * @param msg The data message that is received … … 123 129 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 124 130 const LinkID& lnk = LinkID::UNSPECIFIED); 131 132 133 /** 134 * NOTE: This interface is still unstable and may change in future releases. 135 * ---> Especially the parameter «overlay_msg» may be replaced or removed. 136 * 137 * Called when a message is incoming 138 * @param msg A shared buffer with the (serialized) payload 139 * @param remote The remote node that sent the message 140 * @param lnk The link id of the link where the message is received 141 * @param overlay_msg A pointer to the ariba internal Overlay Message 142 */ 143 virtual void onMessage(reboost::shared_buffer_t message, 144 const NodeID& remote, 145 const LinkID& lnk, 146 const SequenceNumber& seqnum, 147 const ariba::overlay::OverlayMsg* overlay_msg); 125 148 126 149 // --- dht functionality --- … … 133 156 virtual void onKeyValue( const Data& key, const vector<Data>& value ); 134 157 158 // --- ping pong (routed) --- 159 /** 160 * Called when a ping message was received. 161 * 162 * @param remote Node which sent the ping message 163 * @return bool Allows to send a pong message (default: true) 164 */ 165 virtual bool onPing(const NodeID& remote); 166 167 /** 168 * Called when a ping message is lost. 169 * 170 * NOTE: A ping message could also get lost without notice. 171 * 172 * @param remote Node to which the ping message was sent 173 */ 174 virtual void onPingLost(const NodeID& remote); 175 176 /** 177 * Called when a pong message was received. 178 * 179 * @param remote Node which sent the pong message 180 */ 181 virtual void onPong(const NodeID& remote); 135 182 }; 136 183 -
source/ariba/DataMessage.h
r9684 r12060 9 9 // use message utility 10 10 #ifdef USE_MESSAGE_UTILITY 11 #include "ariba/utility/messages.h" 11 // #include "ariba/utility/messages.h" 12 #include "ariba/utility/messages/Message.h" 12 13 namespace ariba { 13 14 typedef utility::Message Message; -
source/ariba/Message.h
r9684 r12060 41 41 42 42 #include <inttypes.h> 43 #include "ariba/utility/messages.h" 43 44 // legacy messages 45 #include "ariba/utility/messages/Message.h" 46 // reboost messages 47 #include "ariba/utility/transport/messages/message.hpp" 44 48 45 49 /** \addtogroup public -
source/ariba/Node.cpp
r10653 r12060 39 39 #include "Node.h" 40 40 41 #include <boost/foreach.hpp> 42 41 43 #include "ariba/overlay/BaseOverlay.h" 44 #include "ariba/communication/BaseCommunication.h" 45 42 46 #include "ariba/utility/types/OverlayParameterSet.h" 43 47 #include "ariba/communication/EndpointDescriptor.h" 44 48 49 #include <boost/property_tree/exceptions.hpp> 50 51 using namespace std; 45 52 using ariba::communication::EndpointDescriptor; 53 using boost::property_tree::ptree; 46 54 47 55 namespace ariba { 48 56 49 Node::Node(AribaModule& ariba_mod, const Name& node_name) : 50 name(node_name), ariba_mod(ariba_mod) { 51 base_overlay = new BaseOverlay(); 57 //Node::Node(AribaModule& ariba_mod, const Name& node_name) : 58 // name(node_name), ariba_mod(ariba_mod) { 59 // base_overlay = new BaseOverlay(); 60 //} 61 62 Node::Node() : 63 name(Name::UNSPECIFIED), 64 base_communication(NULL), 65 base_overlay(NULL) 66 { 67 base_communication = new BaseCommunication(); 68 base_overlay = new BaseOverlay(); 52 69 } 53 70 … … 55 72 delete base_overlay; 56 73 base_overlay = NULL; 57 } 58 59 void Node::join(const Name& vnetname) { 60 spovnetId = vnetname.toSpoVNetId(); 61 nodeId = generateNodeId(name); 62 63 // start base comm if not started 64 if( !ariba_mod.base_comm->isStarted() ) 65 ariba_mod.base_comm->start(); 66 67 // start base overlay if not started 68 // join against ourselfs 69 if( !base_overlay->isStarted() ) 70 base_overlay->start( *ariba_mod.base_comm, nodeId ); 71 base_overlay->joinSpoVNet( spovnetId ); 72 73 // join against static bootstrap points and 74 // start automatic bootstrapping modules 75 vector<AribaModule::BootstrapMechanism> mechanisms 76 = ariba_mod.getBootstrapMechanisms(vnetname); 77 78 vector<pair<BootstrapManager::BootstrapType,string> > internalmodules; 79 80 BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){ 81 switch(m){ 82 case AribaModule::BootstrapMechanismStatic: 83 { 84 const communication::EndpointDescriptor* ep = 85 ariba_mod.getBootstrapNode(vnetname, m); 86 if( ep != NULL && ep->isUnspecified() == false ) 87 base_overlay->joinSpoVNet( spovnetId, *ep); 88 break; 89 } 90 case AribaModule::BootstrapMechanismBroadcast: 91 internalmodules.push_back(make_pair( 92 BootstrapManager::BootstrapTypePeriodicBroadcast, 93 ariba_mod.getBootstrapInfo(vnetname, m))); 94 break; 95 case AribaModule::BootstrapMechanismMulticastDNS: 96 internalmodules.push_back(make_pair( 97 BootstrapManager::BootstrapTypeMulticastDns, 98 ariba_mod.getBootstrapInfo(vnetname, m))); 99 break; 100 case AribaModule::BootstrapMechanismSDP: 101 internalmodules.push_back(make_pair( 102 BootstrapManager::BootstrapTypeBluetoothSdp, 103 ariba_mod.getBootstrapInfo(vnetname, m))); 104 break; 105 default: 106 break; 107 } 108 } 109 110 // start automatic overlay bootstrapping modules 111 base_overlay->startBootstrapModules(internalmodules); 112 113 // done 114 } 115 116 void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) { 117 utility::OverlayParameterSet ovrpset; 118 ovrpset.setOverlayStructure( 119 (utility::OverlayParameterSet::_OverlayStructure) 120 parm.getBaseOverlayType() 121 ); 122 123 spovnetId = vnetname.toSpoVNetId(); 124 nodeId = generateNodeId(name); 125 126 // start base comm if not started 127 if( !ariba_mod.base_comm->isStarted() ) 128 ariba_mod.base_comm->start(); 129 130 // start base overlay if not started 131 if( !base_overlay->isStarted() ) 132 base_overlay->start( *ariba_mod.base_comm, nodeId ); 133 134 base_overlay->createSpoVNet( spovnetId, ovrpset ); 135 } 74 75 delete base_communication; 76 base_communication = NULL; 77 } 78 79 void Node::connect(const ptree& config) 80 { 81 using namespace boost::property_tree; 82 using namespace addressing2; 83 84 assert( ! base_communication->isStarted() ); 85 assert( ! base_overlay->isStarted() ); 86 87 // XXX needed since »empty_ptree<ptree>()« is not working 88 // ---> see: http://stackoverflow.com/questions/5003549/where-is-boost-property-treeempty-ptree 89 static const ptree empty_pt; 90 91 // SpovNet ID 92 Name spovnet_name(config.get<string>("spovnet_name")); 93 spovnetId = spovnet_name.toSpoVNetId(); 94 95 // Node ID 96 try 97 { 98 name = config.get<string>("node_name"); 99 } 100 catch ( ptree_bad_path& e ) 101 { 102 name = Name::UNSPECIFIED; 103 } 104 nodeId = generateNodeId(name); 105 106 107 108 /* Base Communication */ 109 EndpointSetPtr listen_on; 110 try 111 { 112 listen_on = endpoint_set::create_EndpointSet( 113 config.get_child("listen_on")); 114 } 115 catch ( ptree_bad_path& e ) 116 { 117 /* no endpoints specified, using default: »[::]:41322+« */ 118 119 ptree default_listen_on; 120 default_listen_on.put("endp.category", "TCPIP"); 121 default_listen_on.put("endp.addr", "::"); 122 default_listen_on.put("endp.port", 0); // defaults to 41322 (or higher) 123 124 listen_on = endpoint_set::create_EndpointSet(default_listen_on); 125 // logging_warn("No endpoints specified in config. ---> Using default."); 126 cout << "No endpoints specified in config. ---> Using default." << endl; 127 } 128 base_communication->start(listen_on); 129 130 // TODO maybe notify the upper layer whether we have any active endpoints 131 132 133 /* Base Overlay */ 134 base_overlay->start( base_communication, nodeId ); 135 136 base_overlay->createSpoVNet( spovnetId ); 137 base_overlay->joinSpoVNet( spovnetId ); 138 139 140 141 /* Bootstrap */ 142 const ptree& bootstrap_pt = config.get_child("bootstrap", empty_pt); 143 144 // Static Bootstrap 145 try 146 { 147 // read endpoint_set from config 148 EndpointSetPtr ep_set = endpoint_set::create_EndpointSet( 149 bootstrap_pt.get_child("direct")); 150 151 EndpointDescriptor ep = EndpointDescriptor::UNSPECIFIED(); 152 ep.replace_endpoint_set(ep_set); 153 154 // try to connect 155 base_overlay->joinSpoVNet( spovnetId, ep); 156 } 157 catch ( ptree_bad_path& e ) 158 { 159 // logging_info("No direct bootstrap info in config."); 160 cout << "No direct bootstrap info in config." << endl; 161 } 162 163 164 /* Bootstrap modules */ 165 vector<pair<BootstrapManager::BootstrapType,string> > internalmodules; 166 167 // Bootstrap: Broadcast 168 if ( bootstrap_pt.get("broadcast", false) ) 169 { 170 internalmodules.push_back(make_pair( 171 BootstrapManager::BootstrapTypePeriodicBroadcast,"")); 172 } 173 174 // Bootstrap: MDNS 175 if ( bootstrap_pt.get("mdns", false) ) 176 { 177 internalmodules.push_back(make_pair( 178 BootstrapManager::BootstrapTypeMulticastDns,"")); 179 } 180 181 // Bootstrap: SDP 182 if ( bootstrap_pt.get("sdp", false) ) 183 { 184 internalmodules.push_back(make_pair( 185 BootstrapManager::BootstrapTypeBluetoothSdp,"")); 186 } 187 188 // start automatic overlay bootstrapping modules 189 base_overlay->startBootstrapModules(internalmodules); 190 } 191 192 //void Node::join(const Name& vnetname) { 193 // spovnetId = vnetname.toSpoVNetId(); 194 // nodeId = generateNodeId(name); 195 // 196 // // start base comm if not started 197 // if( !ariba_mod.base_comm->isStarted() ) 198 // ariba_mod.base_comm->start(); 199 // 200 // // start base overlay if not started 201 // // join against ourselfs 202 // if( !base_overlay->isStarted() ) 203 // base_overlay->start( *ariba_mod.base_comm, nodeId ); 204 // base_overlay->joinSpoVNet( spovnetId ); 205 // 206 // // join against static bootstrap points and 207 // // start automatic bootstrapping modules 208 // vector<AribaModule::BootstrapMechanism> mechanisms 209 // = ariba_mod.getBootstrapMechanisms(vnetname); 210 // 211 // vector<pair<BootstrapManager::BootstrapType,string> > internalmodules; 212 // 213 // BOOST_FOREACH(AribaModule::BootstrapMechanism m, mechanisms){ 214 // switch(m){ 215 // case AribaModule::BootstrapMechanismStatic: 216 // { 217 // const communication::EndpointDescriptor* ep = 218 // ariba_mod.getBootstrapNode(vnetname, m); 219 // if( ep != NULL && ep->isUnspecified() == false ) 220 // base_overlay->joinSpoVNet( spovnetId, *ep); 221 // break; 222 // } 223 // case AribaModule::BootstrapMechanismBroadcast: 224 // internalmodules.push_back(make_pair( 225 // BootstrapManager::BootstrapTypePeriodicBroadcast, 226 // ariba_mod.getBootstrapInfo(vnetname, m))); 227 // break; 228 // case AribaModule::BootstrapMechanismMulticastDNS: 229 // internalmodules.push_back(make_pair( 230 // BootstrapManager::BootstrapTypeMulticastDns, 231 // ariba_mod.getBootstrapInfo(vnetname, m))); 232 // break; 233 // case AribaModule::BootstrapMechanismSDP: 234 // internalmodules.push_back(make_pair( 235 // BootstrapManager::BootstrapTypeBluetoothSdp, 236 // ariba_mod.getBootstrapInfo(vnetname, m))); 237 // break; 238 // default: 239 // break; 240 // } 241 // } 242 // 243 // // start automatic overlay bootstrapping modules 244 // base_overlay->startBootstrapModules(internalmodules); 245 // 246 // // done 247 //} 248 // 249 //void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) { 250 // utility::OverlayParameterSet ovrpset; 251 // ovrpset.setOverlayStructure( 252 // (utility::OverlayParameterSet::_OverlayStructure) 253 // parm.getBaseOverlayType() 254 // ); 255 // 256 // spovnetId = vnetname.toSpoVNetId(); 257 // nodeId = generateNodeId(name); 258 // 259 // // start base comm if not started 260 // if( !ariba_mod.base_comm->isStarted() ) 261 // ariba_mod.base_comm->start(); 262 // 263 // // start base overlay if not started 264 // if( !base_overlay->isStarted() ) 265 // base_overlay->start( *ariba_mod.base_comm, nodeId ); 266 // 267 // base_overlay->createSpoVNet( spovnetId, ovrpset ); 268 //} 136 269 137 270 void Node::leave() { 138 271 base_overlay->stopBootstrapModules(); 139 272 base_overlay->leaveSpoVNet(); 140 ariba_mod.base_comm->stop();273 base_communication->stop(); // XXX before »base_overlay->stop()« ?? 141 274 base_overlay->stop(); 142 275 } … … 172 305 } 173 306 307 bool Node::isLinkDirect(const ariba::LinkID& lnk) const 308 { 309 return base_overlay->isLinkDirect(lnk); 310 } 311 312 int Node::getHopCount(const ariba::LinkID& lnk) const 313 { 314 return base_overlay->getHopCount(lnk); 315 } 316 317 318 319 320 /* +++++ Message sending +++++ */ 321 void Node::check_send_priority(uint8_t priority) 322 { 323 if ( priority < send_priority::HIGHEST || priority > send_priority::LOWEST ) 324 throw std::invalid_argument("Illegal priority"); 325 } 326 327 328 // +++ new interface +++ 329 const SequenceNumber& Node::sendMessage(reboost::message_t msg, const LinkID& lnk, uint8_t priority) 330 { 331 // check priority 332 check_send_priority(priority); 333 334 // * call base overlay * 335 return base_overlay->sendMessage(msg, lnk, priority); 336 } 337 338 // +++ legacy interface +++ 339 seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) 340 { 341 reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 342 343 try 344 { 345 base_overlay->sendMessage(message, lnk, send_priority::NORMAL); 346 return 0; 347 } 348 catch ( ariba::overlay::message_not_sent& e ) 349 { 350 logging_warn("Message could not be sent. Dropped."); 351 return -1; 352 } 353 } 354 355 356 // +++ new interface +++ 357 const SequenceNumber& Node::sendMessage(reboost::message_t msg, const NodeID& nid, 358 const ServiceID& sid, uint8_t priority, const LinkProperties& req) { 359 360 // check priority 361 check_send_priority(priority); 362 363 // * call base overlay * 364 return base_overlay->sendMessage(msg, nid, priority, sid); 365 } 366 367 // +++ legacy interface +++ 174 368 seqnum_t Node::sendMessage(const DataMessage& msg, const NodeID& nid, 175 369 const ServiceID& sid, const LinkProperties& req) { 176 return base_overlay->sendMessage((Message*) msg, nid, sid); 177 } 178 370 371 // reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 372 reboost::message_t message; 373 message.push_back( ((Message*) msg)->serialize_into_shared_buffer() ); 374 375 try 376 { 377 sendMessage(message, nid, sid, send_priority::NORMAL, req); 378 return 0; 379 } 380 catch ( ariba::overlay::message_not_sent& e ) 381 { 382 logging_warn("Message could not be sent. Dropped."); 383 return -1; 384 } 385 } 386 387 388 // +++ new interface +++ 389 NodeID Node::sendMessageCloserToNodeID(reboost::message_t msg, const NodeID& nid, const ServiceID& sid, 390 uint8_t priority, const LinkProperties& req) { 391 392 // check priority 393 check_send_priority(priority); 394 395 // * call base overlay * 396 return base_overlay->sendMessageCloserToNodeID(msg, nid, priority, sid); 397 } 398 399 // +++ legacy interface +++ 179 400 NodeID Node::sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 180 401 const LinkProperties& req) { 181 402 182 return base_overlay->sendMessageCloserToNodeID((Message*) msg, nid, sid); 183 } 184 185 186 seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) { 187 return base_overlay->sendMessage((Message*) msg, lnk); 188 } 189 403 reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 404 405 return sendMessageCloserToNodeID(message, nid, sid, send_priority::NORMAL, req); 406 } 407 408 409 // +++ new interface +++ 410 void Node::sendBroadcastMessage(reboost::message_t msg, const ServiceID& sid, uint8_t priority) { 411 412 // check priority 413 check_send_priority(priority); 414 415 // * call base overlay * 416 return base_overlay->broadcastMessage(msg, sid, priority); 417 } 418 419 // +++ legacy interface +++ 190 420 void Node::sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid) { 191 return base_overlay->broadcastMessage((Message*)msg, sid); 192 } 421 reboost::message_t message = ((Message*) msg)->wrap_up_for_sending(); 422 423 return sendBroadcastMessage(message, sid); 424 } 425 426 427 /* +++++ [Message sending] +++++ */ 428 429 430 193 431 194 432 bool Node::bind(NodeListener* listener) { … … 204 442 bool ret = base_overlay->bind(listener, sid); 205 443 206 // now that we have a listener, we can ask if sniffing is ok207 if( ariba_mod.sideport_sniffer != NULL ){208 base_overlay->registerSidePort(ariba_mod.sideport_sniffer);209 }444 // // now that we have a listener, we can ask if sniffing is ok 445 // if( ariba_mod.sideport_sniffer != NULL ){ 446 // base_overlay->registerSidePort(ariba_mod.sideport_sniffer); 447 // } 210 448 211 449 return ret; -
source/ariba/Node.h
r10653 r12060 43 43 namespace ariba { 44 44 class Node; 45 namespace overlay { 46 class BaseOverlay; 47 } 45 namespace communication { 46 class BaseCommunication; 47 } 48 namespace overlay { 49 class BaseOverlay; 50 } 48 51 } 49 52 50 53 #include <vector> 51 54 #include <iostream> 52 #include <boost/foreach.hpp> 55 #include <exception> 56 53 57 #include "Module.h" 54 58 #include "Identifiers.h" … … 56 60 #include "NodeListener.h" 57 61 #include "Name.h" 58 #include "AribaModule.h"62 //#include "AribaModule.h" 59 63 #include "CommunicationListener.h" 60 64 #include "DataMessage.h" 61 65 #include "SideportListener.h" 66 #include "ariba/overlay/SequenceNumber.h" 67 68 // reboost messages 69 #include "ariba/utility/transport/messages/message.hpp" 70 71 #include <boost/property_tree/ptree.hpp> 62 72 63 73 using std::vector; … … 65 75 66 76 namespace ariba { 77 78 // typedef ariba::overlay::SequenceNumber SequenceNumber; // XXX see CommunicationListener 79 80 using boost::property_tree::ptree; 81 82 // sendMessage-Priorities 83 struct send_priority 84 { 85 enum SEND_PRIORITY 86 { 87 HIGHEST = 2, 88 HIGHER = 3, 89 NORMAL = 4, 90 LOWER = 5, 91 LOWEST = 6 92 }; 93 }; 94 95 67 96 68 97 /** … … 75 104 * @author Christoph Mayer <mayer@tm.uka.de> 76 105 */ 106 // TODO do we really want to inherit from Module.. ? 77 107 class Node: public Module { 78 108 public: 109 79 110 /** 80 111 * Constructs a new node using a given ariba module … … 86 117 * is a zero-terminated char-string. 87 118 */ 88 Node(AribaModule& ariba_mod, const Name& node_name = Name::UNSPECIFIED); 119 // Node(AribaModule& ariba_mod, const Name& node_name = Name::UNSPECIFIED); 120 121 // XXX EXPERIMENTAL 122 Node(); 89 123 90 124 /** … … 100 134 //--- node control --- 101 135 136 /** 137 * XXX EXPERIMENTAL 138 * 139 * Replaces initialte & join 140 */ 141 void connect(const ptree& config); 142 143 // XXX DEPRECATED 102 144 /** 103 145 * This method instructs the node to join a particular spovnet. … … 107 149 * @param vnetId The SpoVNet name 108 150 */ 109 void join(const Name& name);151 // void join(const Name& name); 110 152 111 153 /** … … 116 158 * @param param The SpoVNet properties 117 159 */ 118 void initiate(const Name& name, const SpoVNetProperties& parm =119 SpoVNetProperties::DEFAULT);160 // void initiate(const Name& name, const SpoVNetProperties& parm = 161 // SpoVNetProperties::DEFAULT); 120 162 121 163 /** … … 221 263 void dropLink(const LinkID& lnk); 222 264 223 // message sending 224 225 /** 226 * Sends a one-shot message to a service. If link properties are specified, 227 * the node tries to fulfill those requirements. This may cause the node 228 * to first establish a temporary link, second sending the message and last 229 * dropping the link. This would result in a small amount of extra latency 230 * until the message is delivered. If reliable transport was selected, 231 * the method returns a sequence number and a communication event is 232 * triggered on message delivery or loss. 233 * 234 * @param msg The message to be sent 235 * @param nid The remote node identifier 236 * @param sid The remote service identifier 237 * @param req The requirements associated with the message 238 * @return A sequence number 239 */ 265 /** 266 * Returns whether a link is direct or relayed over other nodes 267 * @param lnk LinkID of the link 268 * @return true if link is direct; false otherwise 269 */ 270 bool isLinkDirect(const ariba::LinkID& lnk) const; 271 272 /** 273 * Returns the latest measured hop count on this link. 274 * NOTE: This is not guaranteed to be up to date. 275 * 276 * @param lnk LinkID of the link 277 * @return overlay hop count on this link 278 */ 279 int getHopCount(const ariba::LinkID& lnk) const; 280 281 282 /* +++++ Message sending +++++ */ 283 284 285 /** 286 * Sends a message via an established link. If reliable transport was 287 * selected, the method returns a sequence number and a communication event 288 * is triggered on message delivery or loss. 289 * 290 * +++ New interface, using efficient zero-copy reboost messages. +++ 291 * 292 * @param msg The message to be sent 293 * @param lnk The link to be used for sending the message 294 */ 295 const SequenceNumber& sendMessage(reboost::message_t msg, const LinkID& lnk, uint8_t priority=send_priority::NORMAL); 296 297 /** 298 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 299 */ 300 seqnum_t sendMessage(const DataMessage& msg, const LinkID& lnk); 301 302 303 /** 304 * Sends a one-shot message to a service. If link properties are specified, 305 * the node tries to fulfill those requirements. This may cause the node 306 * to first establish a temporary link, second sending the message and last 307 * dropping the link. This would result in a small amount of extra latency 308 * until the message is delivered. If reliable transport was selected, 309 * the method returns a sequence number and a communication event is 310 * triggered on message delivery or loss. 311 * 312 * +++ New interface, using efficient zero-copy reboost messages. +++ 313 * 314 * @param msg The message to be sent 315 * @param nid The remote node identifier 316 * @param sid The remote service identifier 317 * @param req The requirements associated with the message 318 * @return A sequence number 319 */ 320 const SequenceNumber& sendMessage(reboost::message_t msg, const NodeID& nid, const ServiceID& sid, 321 uint8_t priority=send_priority::NORMAL, const LinkProperties& req = LinkProperties::DEFAULT); 322 323 /** 324 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 325 */ 240 326 seqnum_t sendMessage(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 241 327 const LinkProperties& req = LinkProperties::DEFAULT); 242 328 243 /** 244 * like the above function, but sends the message to the closest directly known node 245 * to the specified address 246 */ 329 330 /** 331 * like the above function, but sends the message to the closest directly known node 332 * to the specified address 333 * 334 * +++ New interface, using efficient zero-copy reboost messages. +++ 335 * 336 */ 337 NodeID sendMessageCloserToNodeID(reboost::message_t msg, const NodeID& nid, const ServiceID& sid, 338 uint8_t priority=send_priority::NORMAL, const LinkProperties& req = LinkProperties::DEFAULT); 339 340 /** 341 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 342 */ 247 343 NodeID sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 248 344 const LinkProperties& req = LinkProperties::DEFAULT); 249 345 250 /** 251 * Sends a message via an established link. If reliable transport was 252 * selected, the method returns a sequence number and a communication event 253 * is triggered on message delivery or loss. 254 * 255 * @param msg The message to be sent 256 * @param lnk The link to be used for sending the message 257 */ 258 seqnum_t sendMessage(const DataMessage& msg, const LinkID& lnk); 259 260 /** 261 * Sends a message to all known hosts in the overlay structure 262 * the nodes that are reached here depend on the overlay structure. 263 * 264 * @param msg The message to be send 265 * @param sid The id of the service that should receive the message 266 * @see getNeighborNodes 267 */ 346 347 /** 348 * Sends a message to all known hosts in the overlay structure 349 * the nodes that are reached here depend on the overlay structure. 350 * 351 * +++ New interface, using efficient zero-copy reboost messages. +++ 352 * 353 * @param msg The message to be send 354 * @param sid The id of the service that should receive the message 355 * @see getNeighborNodes 356 */ 357 void sendBroadcastMessage(reboost::message_t msg, const ServiceID& sid, uint8_t priority=send_priority::NORMAL); 358 359 /** 360 * +++ Legacy interface, converts old ariba messages into new reboost messages. +++ 361 */ 268 362 void sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid); 269 363 364 365 /* +++++ [Message sending] +++++ */ 366 367 368 270 369 // --- communication listeners --- 271 370 … … 313 412 /** @see Module.h */ 314 413 string getName() const; 315 414 415 416 private: 417 inline void check_send_priority(uint8_t priority); 418 419 316 420 protected: 317 421 // friends … … 320 424 // member variables 321 425 Name name; //< node name 322 AribaModule&ariba_mod; //< ariba module426 // AribaModule* ariba_mod; //< ariba module 323 427 SpoVNetID spovnetId; //< current spovnet id 324 428 NodeID nodeId; //< current node id 429 communication::BaseCommunication* base_communication; 325 430 overlay::BaseOverlay* base_overlay; //< the base overlay 326 431 -
source/ariba/SideportListener.cpp
r7468 r12060 41 41 #include "ariba/overlay/BaseOverlay.h" 42 42 #include "ariba/overlay/LinkDescriptor.h" 43 #include "ariba/utility/addressing /endpoint_set.hpp"43 #include "ariba/utility/addressing2/tcpip_endpoint.hpp" 44 44 45 45 using ariba::overlay::LinkDescriptor; … … 130 130 if( overlay == NULL ) return (Protocol)ret; 131 131 132 using namespace ariba::addressing;133 134 132 LinkDescriptor* link = NULL; 135 133 BOOST_FOREACH( LinkDescriptor* lnk, overlay->links ){ … … 147 145 if(bclink.isUnspecified() || bclink.remoteLocator == NULL) return (Protocol)ret; 148 146 149 const address_v*locator = bclink.remoteLocator;147 addressing2::EndpointPtr locator = bclink.remoteLocator; 150 148 151 if( locator->instanceof<tcpip_endpoint>() ){ 152 tcpip_endpoint tcpip = *locator; 153 154 if( tcpip.address().is_v4() || tcpip.address().is_v4_mapped() ){ 149 if( locator->get_category() == addressing2::endpoint_category::TCPIP ) 150 { 151 boost::shared_ptr<addressing2::tcpip_endpoint> endp = 152 boost::dynamic_pointer_cast<addressing2::tcpip_endpoint>(locator); 153 154 if( endp->to_asio().address().is_v4() ) 155 { 155 156 ret = SideportListener::ipv4; 156 }else if( tcpip.address().is_v6() ){157 }else { 157 158 ret = SideportListener::ipv6; 158 159 } 159 160 160 }else if( locator-> instanceof<rfcomm_endpoint>()){161 }else if( locator->get_category() == addressing2::endpoint_category::BLUETOOTH ){ 161 162 ret = SideportListener::rfcomm; 162 163 } -
source/ariba/ariba.h
r3374 r12060 44 44 */ 45 45 46 #include "AribaModule.h"47 46 #include "CommunicationListener.h" 48 47 #include "DataMessage.h" -
source/ariba/communication/BaseCommunication.cpp
r10767 r12060 57 57 namespace communication { 58 58 59 using namespace ariba::addressing2; 60 59 61 using ariba::utility::PeerID; 60 62 using ariba::utility::SystemQueue; 61 63 62 64 use_logging_cpp(BaseCommunication); 63 64 /// adds an endpoint to the list65 void BaseCommunication::add_endpoint( const address_v* endpoint ) {66 if (endpoint==NULL) return;67 BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {68 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {69 ref.count++;70 return;71 }72 }73 endpoint_reference ref;74 ref.endpoint = endpoint->clone();75 ref.count = 1;76 remote_endpoints.push_back(ref);77 }78 79 /// removes an endpoint from the list80 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {81 if (endpoint==NULL) return;82 for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();83 i != remote_endpoints.end(); i++) {84 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {85 i->count--;86 if (i->count==0) {87 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");88 transport->terminate(i->endpoint);89 delete i->endpoint;90 remote_endpoints.erase(i);91 }92 return;93 }94 }95 }96 65 97 66 … … 100 69 transport( NULL ), 101 70 messageReceiver( NULL ), 102 started( false ) 71 started( false ), 72 listenOn_endpoints(new addressing2::endpoint_set()) 103 73 { 104 74 } … … 109 79 110 80 111 void BaseCommunication::start() { 81 void BaseCommunication::start(EndpointSetPtr listen_on) { 82 assert ( ! started ); 83 84 listenOn_endpoints = listen_on; 85 logging_info("Setting local end-points: " << listenOn_endpoints->to_string()); 86 112 87 logging_info( "Starting up ..." ); 113 88 currentSeqnum = 0; 114 89 115 // set local peer id116 localDescriptor.getPeerId() = PeerID::random();117 logging_info( "Using PeerID: " << localDescriptor.getPeerId() );118 119 90 // creating transports 91 // ---> transport_peer holds the set of the active endpoints we're listening on 120 92 logging_info( "Creating transports ..." ); 121 122 #ifdef UNDERLAY_OMNET 123 AribaOmnetModule* module = StartupWrapper::getCurrentModule(); 124 module->setServerPort( listenport ); 125 126 transport = module; 127 network = new OmnetNetworkProtocol( module ); 128 #else 129 transport = new transport_peer( localDescriptor.getEndpoints() ); 130 #endif 93 transport = new transport_peer(); 94 active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints); 95 logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX 131 96 132 97 logging_info( "Searching for local locators ..." ); 133 /** 134 * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0) 135 * since addresses are used to initialize transport addresses 136 */ 137 AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() ); 138 logging_info( "Done. Local endpoints = " << localDescriptor.toString() ); 139 98 local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints); 99 if ( local_endpoints->count() > 0 ) 100 { 101 logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() ); 102 } 103 else 104 { 105 logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!"); 106 107 // TODO notify application, so that it may react properly. throw exception..? 108 assert( false ); 109 } 110 111 112 // create local EndpointDescriptor 113 // ---> localDescriptor hold the set endpoints that can be used to reach us 114 localDescriptor.getPeerId() = PeerID::random(); 115 localDescriptor.replace_endpoint_set(local_endpoints); 116 logging_info( "Using PeerID: " << localDescriptor.getPeerId() ); 117 118 // start transport_peer 140 119 transport->register_listener( this ); 141 120 transport->start(); 142 121 143 #ifndef UNDERLAY_OMNET144 122 // bind to the network change detection 145 123 networkMonitor.registerNotification( this ); 146 #endif147 124 148 125 // base comm startup done … … 163 140 bool BaseCommunication::isStarted(){ 164 141 return started; 165 }166 167 /// Sets the endpoints168 void BaseCommunication::setEndpoints( string& _endpoints ) {169 localDescriptor.getEndpoints().assign(_endpoints);170 logging_info("Setting local end-points: "171 << localDescriptor.getEndpoints().to_string());172 142 } 173 143 … … 193 163 addLink( ld ); 194 164 195 // send a message to request new link to remote 196 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 197 AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid ); 198 baseMsg.getLocalDescriptor() = localDescriptor; 199 baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId(); 200 201 // serialize and send message 202 send( &baseMsg, descriptor ); 165 166 /* send a message to request new link to remote */ 167 logging_debug( "Send messages with request to open link to " << descriptor.toString() ); 168 169 /* 170 * Create Link-Request Message: 171 * NOTE: - Their PeerID (in parent message) 172 * - Our LinkID 173 * - Our PeerID 174 * - Our EndpointDescriptor 175 */ 176 reboost::message_t linkmsg; 177 linkmsg.push_back(linkid.serialize()); 178 linkmsg.push_back(localDescriptor.getPeerId().serialize()); 179 linkmsg.push_back(localDescriptor.endpoints->serialize()); 180 181 // // XXX AKTUELL BUG FINDING... 182 // reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize(); 183 // EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet(); 184 // xxx_set->deserialize(xxx); 185 // cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl; 186 // cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl; 187 188 // send message 189 // TODO move enum to BaseComm 190 send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg, 191 descriptor, system_priority::OVERLAY); 203 192 204 193 return linkid; … … 217 206 218 207 // tell the registered listeners 219 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {208 foreach( CommunicationEvents* i, eventListener ) { 220 209 i->onLinkDown( link, ld.localLocator, ld.remoteLocator ); 221 210 } 222 211 223 // create message to drop the link 212 213 // * send message to drop the link * 224 214 logging_debug( "Sending out link close request. for us, the link is closed now" ); 225 AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink ); 226 227 // send message to drop the link 228 send( &msg, ld ); 215 reboost::message_t empty_message; 216 send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY ); 229 217 230 218 // remove from map … … 232 220 } 233 221 234 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) { 235 236 logging_debug( "Sending out message to link " << lid.toString() ); 237 222 223 seqnum_t BaseCommunication::sendMessage( const LinkID& lid, 224 reboost::message_t message, 225 uint8_t priority, 226 bool bypass_overlay) throw(communication_message_not_sent) 227 { 228 // message type: direct data or (normal) data 229 AribaBaseMsg::type_ type; 230 if ( bypass_overlay ) 231 { 232 type = AribaBaseMsg::typeDirectData; 233 logging_debug( "Sending out direct-message to link " << lid.toString() ); 234 } 235 else 236 { 237 type = AribaBaseMsg::typeData; 238 logging_debug( "Sending out message to link " << lid.toString() ); 239 } 240 241 238 242 // query local link info 239 243 LinkDescriptor& ld = queryLocalLink(lid); 240 if( ld.isUnspecified() ){ 241 logging_error( "Don't know the link with id " << lid.toString() ); 242 return -1; 244 if( ld.isUnspecified() ) 245 { 246 throw communication_message_not_sent("Don't know the link with id " 247 + lid.toString()); 243 248 } 244 249 245 250 // link not up-> error 246 if( !ld.up ) {247 logging_error("Can not send on link " << lid.toString() << ": link not up");248 return -1;249 }250 251 // create message 252 AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink ); 253 254 // encapsulate the payload message255 msg.encapsulate( const_cast<Message*>(message) ); 256 257 // send message258 send( &msg, ld);259 260 // return sequence number251 if( !ld.up ) 252 { 253 throw communication_message_not_sent("Can not send on link " 254 + lid.toString() + ": link not up"); 255 } 256 257 258 // * send message * 259 bool okay = send_over_link( type, message, ld, priority ); 260 261 if ( ! okay ) 262 { 263 throw communication_message_not_sent("send_over_link failed!"); 264 } 265 261 266 return ++currentSeqnum; 262 267 } … … 268 273 LinkDescriptor& linkDesc = queryLocalLink(link); 269 274 if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED(); 270 return linkDesc.remote Endpoint;275 return linkDesc.remoteDescriptor; 271 276 } 272 277 } … … 283 288 } 284 289 285 SystemEventType TransportEvent("Transport"); 286 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent ); 287 288 /// called when a system event is emitted by system queue 289 void BaseCommunication::handleSystemEvent(const SystemEvent& event) { 290 291 // dispatch received messages 292 if ( event.getType() == MessageDispatchEvent ){ 293 logging_debug( "Forwarding message receiver" ); 294 boost::function0<void>* handler = event.getData< boost::function0<void> >(); 295 (*handler)(); 296 delete handler; 297 } 298 } 299 300 /** 301 * called within the ASIO thread 302 * when a message is received from underlay transport 303 */ 290 291 292 /*------------------------------ 293 | ASIO thread --> SystemQueue | 294 ------------------------------*/ 295 296 /// ASIO thread 304 297 void BaseCommunication::receive_message(transport_connection::sptr connection, 305 reboost:: message_t msg) {298 reboost::shared_buffer_t msg) { 306 299 307 300 logging_debug( "Dispatching message" ); 308 301 309 boost::function0<void>* handler = new boost::function0<void>(302 SystemQueue::instance().scheduleCall( 310 303 boost::bind( 311 304 &BaseCommunication::receiveMessage, … … 313 306 connection, 314 307 msg) 315 ); 316 317 SystemQueue::instance().scheduleEvent( 318 SystemEvent(this, MessageDispatchEvent, handler) 319 ); 320 } 321 322 /** 323 * called within the ARIBA thread (System Queue) 324 * when a message is received from underlay transport 325 */ 308 ); 309 } 310 311 /// ASIO thread 312 void BaseCommunication::connection_terminated(transport_connection::sptr connection) 313 { 314 SystemQueue::instance().scheduleCall( 315 boost::bind( 316 &BaseCommunication::connectionTerminated, 317 this, 318 connection) 319 ); 320 } 321 322 /*-------------------------------- 323 | [ASIO thread --> SystemQueue] | 324 -------------------------------*/ 325 326 /// ARIBA thread (System Queue) 327 void BaseCommunication::connectionTerminated(transport_connection::sptr connection) 328 { 329 vector<LinkID*> links = connection->get_communication_links(); 330 331 logging_debug("[BaseCommunication] Connection terminated: " 332 << connection->getLocalEndpoint()->to_string() 333 << " <--> " << connection->getRemoteEndpoint()->to_string() 334 << " (" << links.size() << " links)"); 335 336 // remove all links that used the terminated connection 337 for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it ) 338 { 339 LinkID& link_id = **it; 340 341 logging_debug(" ---> Removing link: " << link_id.toString()); 342 343 // searching for link, not found-> warn 344 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 345 if (linkDesc.isUnspecified()) { 346 logging_warn("Failed to find local link " << link_id.toString()); 347 continue; 348 } 349 350 // inform listeners 351 foreach( CommunicationEvents* i, eventListener ){ 352 i->onLinkFail( linkDesc.localLink, 353 linkDesc.localLocator, linkDesc.remoteLocator ); 354 } 355 356 // remove the link descriptor 357 removeLink( link_id ); 358 } 359 } 360 361 /// ARIBA thread (System Queue) 326 362 void BaseCommunication::receiveMessage(transport_connection::sptr connection, 327 reboost:: message_t message)363 reboost::shared_buffer_t message) 328 364 { 329 330 //// Adapt to old message system //// 331 // Copy data 332 size_t bytes_len = message.size(); 333 uint8_t* bytes = new uint8_t[bytes_len]; 334 message.read(bytes, 0, bytes_len); 335 336 Data data(bytes, bytes_len * 8); 337 338 Message legacy_message; 339 legacy_message.setPayload(data); 340 341 342 343 /// decapsulate message 344 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>(); 345 logging_debug( "Receiving message of type " << msg->getTypeString() ); 346 365 // XXX 366 logging_debug("/// [receiveMessage] buffersize: " << message.size()); 367 368 // get type 369 uint8_t type = message.data()[0]; 370 reboost::shared_buffer_t sub_buff = message(1); 371 372 // get link id 373 LinkID link_id; 374 if ( type != AribaBaseMsg::typeLinkRequest) 375 { 376 sub_buff = link_id.deserialize(sub_buff); 377 } 378 347 379 // handle message 348 switch ( msg->getType()) {349 380 switch ( type ) 381 { 350 382 // --------------------------------------------------------------------- 351 383 // data message 352 384 // --------------------------------------------------------------------- 353 case AribaBaseMsg::typeData: { 354 logging_debug( "Received data message, forwarding to overlay" ); 355 if( messageReceiver != NULL ) { 385 case AribaBaseMsg::typeData: 386 { 387 logging_debug( "Received data message, forwarding to overlay." ); 388 if( messageReceiver != NULL ) 389 { 356 390 messageReceiver->receiveMessage( 357 msg, msg->getRemoteLink(), NodeID::UNSPECIFIED391 sub_buff, link_id, NodeID::UNSPECIFIED, false 358 392 ); 359 393 } 394 360 395 break; 361 396 } 362 397 398 // --------------------------------------------------------------------- 399 // direct data message (bypass overlay-layer) 400 // --------------------------------------------------------------------- 401 case AribaBaseMsg::typeDirectData: 402 { 403 logging_debug( "Received direct data message, forwarding to application." ); 404 405 if( messageReceiver != NULL ) 406 { 407 messageReceiver->receiveMessage( 408 sub_buff, link_id, NodeID::UNSPECIFIED, true 409 ); 410 } 411 412 break; 413 } 414 415 416 363 417 // --------------------------------------------------------------------- 364 418 // handle link request from remote 365 419 // --------------------------------------------------------------------- 366 case AribaBaseMsg::typeLinkRequest: { 367 logging_debug( "Received link open request" ); 368 369 /// not the correct peer id-> skip request 370 if (!msg->getRemoteDescriptor().getPeerId().isUnspecified() 371 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) { 372 logging_info("Received link request for " 373 << msg->getRemoteDescriptor().getPeerId().toString() 374 << "but i'm " 375 << localDescriptor.getPeerId() 376 << ": Ignoring!"); 377 break; 378 } 379 420 case AribaBaseMsg::typeLinkRequest: 421 { 422 logging_debug( "Received link open request on " 423 << connection->getLocalEndpoint()->to_string() ); 424 425 /* 426 * Deserialize Peer Message 427 * - Our PeerID 428 */ 429 PeerID our_peer_id; 430 sub_buff = our_peer_id.deserialize(sub_buff); 431 432 /// not the correct peer id-> skip request 433 if ( our_peer_id != localDescriptor.getPeerId() && 434 ! our_peer_id.isUnspecified() /* overlay bootstrap */ ) 435 { 436 logging_info("Received link request for " 437 << our_peer_id.toString() 438 << "but i'm " 439 << localDescriptor.getPeerId() 440 << ": Ignoring!"); 441 442 // TODO terminate connection? 443 444 break; 445 } 446 447 448 /* 449 * Deserialize Link-Request Message: 450 * - Their LinkID 451 * - Their PeerID 452 * - Their EndpointDescriptor 453 */ 454 LinkID their_link_id; 455 PeerID their_peer_id; 456 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 457 sub_buff = their_link_id.deserialize(sub_buff); 458 sub_buff = their_peer_id.deserialize(sub_buff); 459 sub_buff = their_endpoints->deserialize(sub_buff); 460 /* [ Deserialize Link-Request Message ] */ 461 462 380 463 /// only answer the first request 381 if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) { 464 if (!queryRemoteLink(their_link_id).isUnspecified()) 465 { 466 467 // TODO aktuell: When will these connections be closed? 468 // ---> Close it now (if it services no links) ? 469 // (see also ! allowlink below) 470 471 // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only! 472 if ( connection->get_communication_links().size() == 0 ) 473 { 474 connection->terminate(); 475 } 476 382 477 logging_debug("Link request already received. Ignore!"); 383 478 break; … … 386 481 /// create link ids 387 482 LinkID localLink = LinkID::create(); 388 LinkID remoteLink = msg->getLocalLink();483 LinkID remoteLink = their_link_id; // XXX intermediate variable is unnecessary 389 484 logging_debug( 390 485 "local=" << connection->getLocalEndpoint()->to_string() … … 394 489 // check if link creation is allowed by ALL listeners 395 490 bool allowlink = true; 396 BOOST_FOREACH( CommunicationEvents* i, eventListener ){491 foreach( CommunicationEvents* i, eventListener ){ 397 492 allowlink &= i->onLinkRequest( localLink, 398 493 connection->getLocalEndpoint(), … … 403 498 if( !allowlink ){ 404 499 logging_warn( "Overlay denied creation of link" ); 405 delete msg;406 500 return; 407 501 } … … 411 505 ld->localLink = localLink; 412 506 ld->remoteLink = remoteLink; 413 ld->localLocator = connection->getLocalEndpoint()->clone(); 414 ld->remoteLocator = connection->getRemoteEndpoint()->clone(); 415 ld->connection = connection; 416 ld->remoteEndpoint = msg->getLocalDescriptor(); 417 add_endpoint(ld->remoteLocator); 418 419 // add layer 1-3 addresses 420 ld->remoteEndpoint.getEndpoints().add( 421 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 422 localDescriptor.getEndpoints().add( 423 connection->getLocalEndpoint(), 424 endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 507 ld->localLocator = connection->getLocalEndpoint(); 508 ld->remoteLocator = connection->getRemoteEndpoint(); 509 ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints); 510 ld->set_connection(connection); 511 512 513 // update endpoints (should only have any effect in case of NAT) 514 ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 515 // localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint()); // XXX 0.0.0.0:0 425 516 426 517 // link is now up-> add it … … 428 519 addLink(ld); 429 520 430 // link is up! 431 logging_debug( "Link (initiated from remote) is up with " 432 << "local(id=" << ld->localLink.toString() << "," 433 << "locator=" << ld->localLocator->to_string() << ") " 434 << "remote(id=" << ld->remoteLink.toString() << ", " 435 << "locator=" << ld->remoteLocator->to_string() << ")" 436 ); 437 438 // sending link request reply 439 logging_debug( "Sending link request reply with ids " 440 << "local=" << localLink.toString() << ", " 441 << "remote=" << remoteLink.toString() ); 442 AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink ); 443 reply.getLocalDescriptor() = localDescriptor; 444 reply.getRemoteDescriptor() = ld->remoteEndpoint; 445 446 send( &reply, *ld ); 521 522 523 /* sending link reply */ 524 logging_debug( "Sending link reply with ids " 525 << "local=" << localLink.toString() << ", " 526 << "remote=" << remoteLink.toString() ); 527 528 /* 529 * Create Link-Reply Message: 530 * - Our LinkID 531 * - Our Endpoint_Set (as update) 532 * - Their EndpointDescriptor (maybe they learn something about NAT) 533 */ 534 reboost::message_t linkmsg; 535 linkmsg.push_back(localLink.serialize()); 536 linkmsg.push_back(localDescriptor.endpoints->serialize()); 537 linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize()); 538 539 // XXX 540 cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl; 541 542 // send message 543 bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY ); 544 545 if ( ! sent ) 546 { 547 logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string()); 548 549 // TODO remove link, close link, ..? 550 551 break; 552 } 553 554 555 // link is up! 556 logging_debug( "Link (initiated from remote) is up with " 557 << "local(id=" << ld->localLink.toString() << "," 558 << "locator=" << ld->localLocator->to_string() << ") " 559 << "remote(id=" << ld->remoteLink.toString() << ", " 560 << "locator=" << ld->remoteLocator->to_string() << ")" 561 ); 447 562 448 563 // inform listeners about new open link 449 BOOST_FOREACH( CommunicationEvents* i, eventListener ) {564 foreach( CommunicationEvents* i, eventListener ) { 450 565 i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator); 451 566 } … … 458 573 // handle link request reply 459 574 // --------------------------------------------------------------------- 460 case AribaBaseMsg::typeLinkReply: { 575 case AribaBaseMsg::typeLinkReply: 576 { 461 577 logging_debug( "Received link open reply for a link we initiated" ); 462 578 579 /* 580 * Deserialize Link-Reply Message: 581 * - Their LinkID 582 * - Their Endpoint_Set (as update) 583 * - Our EndpointDescriptor (maybe we can learn something about NAT) 584 */ 585 LinkID their_link_id; 586 EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet(); 587 EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet(); 588 sub_buff = their_link_id.deserialize(sub_buff); 589 sub_buff = their_endpoints->deserialize(sub_buff); 590 sub_buff = our_endpoints->deserialize(sub_buff); 591 592 463 593 // this is a reply to a link open request, so we have already 464 594 // a link mapping and can now set the remote link to valid 465 LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink());595 LinkDescriptor& ld = queryLocalLink( link_id ); 466 596 467 597 // no link found-> warn! 468 598 if (ld.isUnspecified()) { 469 logging_warn("Failed to find local link " << msg->getRemoteLink().toString()); 470 delete msg; 599 logging_warn("Failed to find local link " << link_id.toString()); 471 600 return; 472 601 } 602 603 if ( ld.up ) 604 { 605 logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString()); 606 607 // TODO send LinkClose ? 608 return; 609 } 473 610 474 611 // store the connection 475 ld. connection = connection;612 ld.set_connection(connection); 476 613 477 614 // set remote locator and link id 478 ld.remoteLink = msg->getLocalLink(); 479 ld.remoteLocator = connection->getRemoteEndpoint()->clone(); 480 ld.remoteEndpoint.getEndpoints().add( 481 msg->getLocalDescriptor().getEndpoints(), 482 endpoint_set::Layer1_4 483 ); 484 485 localDescriptor.getEndpoints().add( 486 msg->getRemoteDescriptor().getEndpoints(), 487 endpoint_set::Layer1_3 488 ); 615 ld.remoteLink = their_link_id; 616 ld.remoteLocator = connection->getRemoteEndpoint(); 617 618 619 /* Update endpoints */ 620 // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information. 621 ld.remoteDescriptor.replace_endpoint_set(their_endpoints); 622 623 // add actual remote endpoint to this set (should only have any effect in case of NAT) 624 ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint()); 625 626 // TODO In case of NAT, we could learn something about our external IP. 627 // ---> But we must trust the remote peer about this information!! 628 // localDescriptor.endpoints->add_endpoints(our_endpoints); 629 630 631 632 489 633 ld.up = true; 490 add_endpoint(ld.remoteLocator);491 634 492 635 logging_debug( "Link is now up with local id " … … 496 639 497 640 // inform lisneters about link up event 498 BOOST_FOREACH( CommunicationEvents* i, eventListener ){641 foreach( CommunicationEvents* i, eventListener ){ 499 642 i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator ); 500 643 } … … 509 652 case AribaBaseMsg::typeLinkClose: { 510 653 // get remote link 511 const LinkID& localLink = msg->getRemoteLink();512 logging_debug( "Received link close request for link " << l ocalLink.toString() );654 // const LinkID& localLink = msg.getRemoteLink(); 655 logging_debug( "Received link close request for link " << link_id.toString() ); 513 656 514 657 // searching for link, not found-> warn 515 LinkDescriptor& linkDesc = queryLocalLink( l ocalLink);658 LinkDescriptor& linkDesc = queryLocalLink( link_id ); 516 659 if (linkDesc.isUnspecified()) { 517 logging_warn("Failed to find local link " << localLink.toString()); 518 delete msg; 660 logging_warn("Failed to find local link " << link_id.toString()); 519 661 return; 520 662 } 521 663 522 664 // inform listeners 523 BOOST_FOREACH( CommunicationEvents* i, eventListener ){665 foreach( CommunicationEvents* i, eventListener ){ 524 666 i->onLinkDown( linkDesc.localLink, 525 667 linkDesc.localLocator, linkDesc.remoteLocator ); … … 527 669 528 670 // remove the link descriptor 529 removeLink( l ocalLink);671 removeLink( link_id ); 530 672 531 673 // done … … 534 676 535 677 // --------------------------------------------------------------------- 536 // handle link locator changes 678 // handle link locator changes -- TODO is this ever called..? 537 679 // --------------------------------------------------------------------- 538 case AribaBaseMsg::typeLinkUpdate: { 539 const LinkID& localLink = msg->getRemoteLink(); 540 logging_debug( "Received link update for link " 541 << localLink.toString() ); 542 543 // find the link description 544 LinkDescriptor& linkDesc = queryLocalLink( localLink ); 545 if (linkDesc.isUnspecified()) { 546 logging_warn("Failed to update local link " 547 << localLink.toString()); 548 delete msg; 549 return; 550 } 551 552 // update the remote locator 553 const address_v* oldremote = linkDesc.remoteLocator; 554 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone(); 555 556 // inform the listeners (local link has _not_ changed!) 557 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 558 i->onLinkChanged( 559 linkDesc.localLink, // linkid 560 linkDesc.localLocator, // old local 561 linkDesc.localLocator, // new local 562 oldremote, // old remote 563 linkDesc.remoteLocator // new remote 564 ); 565 } 566 567 // done 568 break; 569 } 570 } 571 572 delete msg; 680 // case AribaBaseMsg::typeLinkUpdate: { 681 // const LinkID& localLink = msg.getRemoteLink(); 682 // logging_debug( "Received link update for link " 683 // << localLink.toString() ); 684 // 685 // // find the link description 686 // LinkDescriptor& linkDesc = queryLocalLink( localLink ); 687 // if (linkDesc.isUnspecified()) { 688 // logging_warn("Failed to update local link " 689 // << localLink.toString()); 690 // return; 691 // } 692 // 693 // // update the remote locator 694 // addressing2::EndpointPtr oldremote = linkDesc.remoteLocator; 695 // linkDesc.remoteLocator = connection->getRemoteEndpoint(); 696 // 697 // // TODO update linkDesc.connection ? 698 // 699 // // inform the listeners (local link has _not_ changed!) 700 // foreach( CommunicationEvents* i, eventListener ){ 701 // i->onLinkChanged( 702 // linkDesc.localLink, // linkid 703 // linkDesc.localLocator, // old local 704 // linkDesc.localLocator, // new local 705 // oldremote, // old remote 706 // linkDesc.remoteLocator // new remote 707 // ); 708 // } 709 // 710 // // done 711 // break; 712 // } 713 714 715 default: { 716 logging_warn( "Received unknown message type!" ); 717 break; 718 } 719 720 } 573 721 } 574 722 … … 582 730 for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){ 583 731 if( (*i)->localLink != localLink) continue; 584 remove_endpoint((*i)->remoteLocator); 732 // remove_endpoint((*i)->remoteLocator); // XXX 585 733 delete *i; 586 734 linkSet.erase( i ); … … 605 753 } 606 754 607 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {608 LinkIDs ids;609 for (size_t i=0; i<linkSet.size(); i++){610 if( addr == NULL ){611 ids.push_back( linkSet[i]->localLink );612 } else {613 if ( *linkSet[i]->remoteLocator == *addr )614 ids.push_back( linkSet[i]->localLink );615 }616 }617 return ids;618 }755 //LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const { 756 // LinkIDs ids; 757 // for (size_t i=0; i<linkSet.size(); i++){ 758 // if( addr == NULL ){ 759 // ids.push_back( linkSet[i]->localLink ); 760 // } else { 761 // if ( *linkSet[i]->remoteLocator == *addr ) 762 // ids.push_back( linkSet[i]->localLink ); 763 // } 764 // } 765 // return ids; 766 //} 619 767 620 768 void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){ 621 622 #ifdef UNDERLAY_OMNET623 624 // we have no mobility support for simulations625 return626 627 #endif // UNDERLAY_OMNET628 769 629 770 /*- disabled! … … 762 903 } 763 904 764 /// sends a message to all end-points in the end-point descriptor 765 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) { 766 Data data = data_serialize(legacy_message, DEFAULT_V); 767 768 //// Adapt to new message system //// 769 // transfer data buffer ownership to the shared_buffer 770 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 771 772 reboost::message_t message; 773 message.push_back(buf); 774 775 transport->send(endpoint.getEndpoints(), message); 776 } 777 778 /// sends a message to the remote locator inside the link descriptor 779 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) { 780 if (desc.remoteLocator==NULL) return; 781 782 Data data = data_serialize(legacy_message, DEFAULT_V); 783 784 //// Adapt to new message system //// 785 // transfer data buffer ownership to the shared_buffer 786 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 787 788 reboost::message_t message; 789 message.push_back(buf); 790 791 desc.connection->send(message); 792 } 905 906 addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link( 907 const LinkID& linkid) 908 { 909 LinkDescriptor& ld = queryLocalLink(linkid); 910 911 return ld.get_connection()->getLocalEndpoint(); 912 } 913 914 addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link( 915 const LinkID& linkid) 916 { 917 LinkDescriptor& ld = queryLocalLink(linkid); 918 919 return ld.get_connection()->getRemoteEndpoint(); 920 } 921 922 923 924 bool BaseCommunication::send_over_link( 925 const uint8_t type, 926 reboost::message_t message, 927 const LinkDescriptor& desc, 928 const uint8_t priority) 929 { 930 /* 931 * Create Link Message: 932 * - Type 933 * - Their LinkID 934 */ 935 // link id 936 message.push_front(desc.remoteLink.serialize()); 937 // type 938 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 939 /* [ Create Link Message ] */ 940 941 942 /* send message */ 943 transport_connection::sptr conn = desc.get_connection(); 944 if ( ! conn ) 945 { 946 cout << "/// MARIO: No connection!!" << endl; // XXX debug 947 return false; 948 } 949 950 // * send over connection * 951 return conn->send(message, priority); 952 } 953 954 void BaseCommunication::send_to_peer( 955 const uint8_t type, 956 const PeerID& peer_id, 957 reboost::message_t message, 958 const EndpointDescriptor& endpoint, 959 const uint8_t priority ) 960 { 961 /* 962 * Create Peer Message: 963 * - Type 964 * - Their PeerID 965 */ 966 // peer id 967 message.push_front(peer_id.serialize()); 968 // type 969 memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t)); 970 971 972 /* send message */ 973 transport->send(endpoint.getEndpoints(), message, priority); 974 } 975 976 977 793 978 794 979 }} // namespace ariba, communication -
source/ariba/communication/BaseCommunication.h
r10653 r12060 50 50 #include <boost/foreach.hpp> 51 51 52 #ifdef ECLIPSE_PARSER 53 #define foreach(a, b) for(a : b) 54 #else 55 #define foreach(a, b) BOOST_FOREACH(a, b) 56 #endif 57 52 58 // utilities 53 59 #include "ariba/utility/types.h" 54 #include "ariba/utility/messages .h"60 #include "ariba/utility/messages/MessageReceiver.h" 55 61 #include "ariba/utility/logging/Logging.h" 56 62 #include "ariba/utility/misc/Demultiplexer.hpp" … … 58 64 59 65 // new transport and addressing 60 #include "ariba/utility/addressing/addressing.hpp" 61 #include "ariba/utility/transport/transport.hpp" 62 #include "ariba/utility/transport/transport_connection.hpp" 66 #include "ariba/utility/transport/transport_peer.hpp" 67 #include "ariba/utility/transport/interfaces/transport_connection.hpp" 68 #include "ariba/utility/transport/interfaces/transport_listener.hpp" 69 #include "ariba/utility/addressing2/endpoint.hpp" 63 70 64 71 // communication … … 72 79 #include "ariba/communication/networkinfo/NetworkInformation.h" 73 80 74 // disabled75 //#ifndef UNDERLAY_OMNET76 // #include "ariba/communication/modules/transport/tcp/TCPTransport.h"77 // #include "ariba/communication/modules/network/ip/IPv4NetworkProtocol.h"78 // using ariba::communication::IPv4NetworkProtocol;79 // using ariba::communication::TCPTransport;80 //#endif81 82 81 namespace ariba { 83 class SideportListener;82 class SideportListener; 84 83 } 85 84 … … 87 86 namespace communication { 88 87 88 89 class communication_message_not_sent: public std::runtime_error 90 { 91 public: 92 /** Takes a character string describing the error. */ 93 explicit communication_message_not_sent(const string& __arg) : 94 std::runtime_error(__arg) 95 { 96 } 97 98 virtual ~communication_message_not_sent() throw() {} 99 }; 100 101 102 89 103 using namespace std; 90 using namespace ariba::addressing;91 104 using namespace ariba::transport; 92 105 using namespace ariba::utility; … … 102 115 * protocols and addressing schemes. 103 116 * 104 * @author Sebastian Mies, Christoph Mayer 117 * @author Sebastian Mies, Christoph Mayer, Mario Hock 105 118 */ 106 119 class BaseCommunication: 107 120 public NetworkChangeInterface, 108 public SystemEventListener,109 121 public transport_listener { 110 122 … … 120 132 121 133 /// Startup the base communication, start modules etc. 122 void start( );134 void start(addressing2::EndpointSetPtr listen_on); 123 135 124 136 /// Stops the base communication, stop modules etc. 125 137 void stop(); 126 127 /// Sets the endpoints128 void setEndpoints( string& endpoints );129 138 130 139 /// Check whether the base communication has been started up … … 147 156 * @return A sequence number for this message 148 157 */ 149 seqnum_t sendMessage(const LinkID lid, const Message* message); 158 seqnum_t sendMessage(const LinkID& lid, 159 reboost::message_t message, 160 uint8_t priority, 161 bool bypass_overlay = false) throw(communication_message_not_sent); 150 162 151 163 /** … … 164 176 * @return List of LinkID 165 177 */ 166 LinkIDs getLocalLinks(const address_v* addr) const; 178 // LinkIDs getLocalLinks(const address_v* addr) const; // XXX aktuell 167 179 168 180 /** … … 187 199 188 200 void unregisterEventListener(CommunicationEvents* _events); 189 190 /// called when a system event is emitted by system queue191 virtual void handleSystemEvent(const SystemEvent& event);192 201 193 202 /** … … 196 205 */ 197 206 virtual void receive_message(transport_connection::sptr connection, 198 reboost::message_t msg); 199 207 reboost::shared_buffer_t msg); 208 209 /** 210 * called within the ASIO thread 211 * when a connection is terminated (e.g. TCP close) 212 */ 213 virtual void connection_terminated(transport_connection::sptr connection); 214 215 addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid); 216 addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid); 217 218 200 219 protected: 201 220 … … 205 224 */ 206 225 void receiveMessage(transport_connection::sptr connection, 207 reboost::message_t msg); 226 reboost::shared_buffer_t message); 227 228 /** 229 * called within the ARIBA thread (System Queue) 230 * when a connection is terminated (e.g. TCP close) 231 */ 232 void connectionTerminated(transport_connection::sptr connection); 233 208 234 209 235 /// called when a network interface change happens … … 221 247 /// default constructor 222 248 LinkDescriptor() : 223 localLink(LinkID::UNSPECIFIED), localLocator(NULL),224 remoteLink(LinkID::UNSPECIFIED), remoteLocator(NULL),249 localLink(LinkID::UNSPECIFIED), 250 remoteLink(LinkID::UNSPECIFIED), 225 251 up(false) { 226 252 } 227 253 228 ~LinkDescriptor() { 229 if (localLocator!=NULL) delete localLocator; 230 if (remoteLocator!=NULL) delete remoteLocator; 254 ~LinkDescriptor() 255 { 256 if ( connection ) 257 { 258 connection->unregister_communication_link(&localLink); 259 } 231 260 } 232 261 … … 240 269 return *unspec; 241 270 } 271 272 273 transport_connection::sptr get_connection() const 274 { 275 return connection; 276 } 277 278 void set_connection(const transport_connection::sptr& conn) 279 { 280 // unregister from old connection, 281 // if any (but normally there shouldn't..) 282 if ( connection ) 283 { 284 connection->unregister_communication_link(&localLink); 285 } 286 287 // * set_connection * 288 connection = conn; 289 290 // register this link with the connection 291 conn->register_communication_link(&localLink); 292 } 242 293 243 294 bool unspecified; … … 245 296 /// link identifiers 246 297 LinkID localLink; 247 const address_v*localLocator;298 addressing2::EndpointPtr localLocator; 248 299 249 300 /// used underlay addresses for the link 250 301 LinkID remoteLink; 251 const address_v*remoteLocator;302 addressing2::EndpointPtr remoteLocator; 252 303 253 304 /// the remote end-point descriptor 254 EndpointDescriptor remote Endpoint;305 EndpointDescriptor remoteDescriptor; 255 306 256 307 /// flag, whether this link is up 257 308 bool up; 258 309 310 311 private: 259 312 /// connection if link is up 260 313 transport_connection::sptr connection; … … 281 334 /// The local end-point descriptor 282 335 EndpointDescriptor localDescriptor; 283 284 #ifndef UNDERLAY_OMNET 336 337 /** 338 * endpoint_set holding the addresses of the "server"-sockets, 339 * ---> that should be opened 340 * 341 * (e.g. 0.0.0.0:41322) 342 */ 343 addressing2::EndpointSetPtr listenOn_endpoints; 344 345 /** 346 * endpoint_set holding the addresses of the "server"-sockets, 347 * ---> that are actually open 348 * 349 * (e.g. 0.0.0.0:41322) 350 * 351 * XXX should only be in transport_peer 352 */ 353 addressing2::EndpointSetPtr active_listenOn_endpoints; 354 355 /** 356 * endpoint_set holding the addresses of the "server"-sockets, 357 * ---> here the discovered "addressable" addresses are stored 358 * 359 * (e.g. 192.168.0.5:41322) 360 * 361 * XXX should only be in localDescriptor 362 */ 363 addressing2::EndpointSetPtr local_endpoints; 364 285 365 /// network change detector 286 366 NetworkChangeDetection networkMonitor; 287 #endif288 367 289 368 /// list of all remote addresses of links to end-points 290 class endpoint_reference { 291 public: 292 int count; ///< the number of open links to this end-point 293 const address_v* endpoint; ///< the end-point itself 294 }; 295 vector<endpoint_reference> remote_endpoints; 296 297 /// adds an end-point to the list 298 void add_endpoint( const address_v* endpoint ); 299 300 /// removes an end-point from the list 301 void remove_endpoint( const address_v* endpoint ); 369 // XXX DEPRECATED 370 // class endpoint_reference { 371 // public: 372 // int count; ///< the number of open links to this end-point 373 // const address_v* endpoint; ///< the end-point itself 374 // }; 375 // vector<endpoint_reference> remote_endpoints; 376 377 // XXX DEPRECATED 378 // /// adds an end-point to the list 379 // void add_endpoint( const address_v* endpoint ); 380 // 381 // /// removes an end-point from the list 382 // void remove_endpoint( const address_v* endpoint ); 302 383 303 384 /// event listener … … 314 395 MessageReceiver* messageReceiver; 315 396 316 /// convenience: send message to peer 317 void send( Message* message, const EndpointDescriptor& endpoint ); 318 void send( Message* message, const LinkDescriptor& descriptor ); 397 398 /* 399 * Sends a message over an existing link. 400 * ---> Adds »Link Message« Header 401 */ 402 bool send_over_link( 403 const uint8_t type, 404 reboost::message_t message, 405 const LinkDescriptor& desc, 406 const uint8_t priority); 407 408 /* 409 * Sends a message to a known peer. (To all known endpoints.) 410 * ---> Adds »Peer Message« Header 411 */ 412 void send_to_peer( 413 const uint8_t type, 414 const PeerID& peer_id, 415 reboost::message_t message, 416 const EndpointDescriptor& endpoint, 417 const uint8_t priority ); 418 319 419 320 420 /// state of the base communication -
source/ariba/communication/CommunicationEvents.cpp
r5284 r12060 49 49 50 50 bool CommunicationEvents::onLinkRequest(const LinkID& id, 51 const address_v* local, const address_v* remote) { 51 const addressing2::EndpointPtr local, 52 const addressing2::EndpointPtr remote) 53 { 52 54 return true; 53 55 } 54 56 55 void CommunicationEvents::onLinkUp(const LinkID& id, const address_v* local, 56 const address_v* remote) { 57 void CommunicationEvents::onLinkUp(const LinkID& id, 58 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 59 { 57 60 } 58 61 59 void CommunicationEvents::onLinkDown(const LinkID& id, const address_v* local, 60 const address_v* remote) { 62 void CommunicationEvents::onLinkDown(const LinkID& id, 63 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 64 { 61 65 } 62 66 63 67 void CommunicationEvents::onLinkChanged(const LinkID& id, 64 const address_v* oldlocal, const address_v* newlocal, 65 const address_v* oldremote, const address_v* newremote) { 68 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 69 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote) 70 { 66 71 } 67 72 68 void CommunicationEvents::onLinkFail(const LinkID& id, const address_v* local, 69 const address_v* remote) { 73 void CommunicationEvents::onLinkFail(const LinkID& id, 74 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 75 { 70 76 } 71 77 72 78 void CommunicationEvents::onLinkQoSChanged(const LinkID& id, 73 const address_v* local, const address_v* remote, const QoSParameterSet& qos) { 79 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 80 const QoSParameterSet& qos) 81 { 74 82 } 75 83 -
source/ariba/communication/CommunicationEvents.h
r5284 r12060 42 42 #include "ariba/utility/types/LinkID.h" 43 43 #include "ariba/utility/types/QoSParameterSet.h" 44 #include "ariba/utility/addressing /addressing.hpp"44 #include "ariba/utility/addressing2/endpoint.hpp" 45 45 46 46 namespace ariba { … … 49 49 using ariba::utility::LinkID; 50 50 using ariba::utility::QoSParameterSet; 51 using namespace ariba::addressing;52 51 53 52 class CommunicationEvents { … … 68 67 * @return True, if the link should be established 69 68 */ 70 virtual bool onLinkRequest(const LinkID& id, const address_v* local, 71 const address_v* remote); 69 virtual bool onLinkRequest(const LinkID& id, 70 const addressing2::EndpointPtr local, 71 const addressing2::EndpointPtr remote); 72 72 73 73 /** … … 77 77 * @param id The link id of the established link 78 78 */ 79 virtual void onLinkUp(const LinkID& id, const address_v* local,80 const address_v*remote);79 virtual void onLinkUp(const LinkID& id, 80 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 81 81 82 82 /** … … 85 85 * @param id The link identifier of the dropped link 86 86 */ 87 virtual void onLinkDown(const LinkID& id, const address_v* local,88 const address_v*remote);87 virtual void onLinkDown(const LinkID& id, 88 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 89 89 90 90 /** … … 97 97 */ 98 98 virtual void onLinkChanged(const LinkID& id, 99 const address_v* oldlocal, const address_v* newlocal, 100 const address_v* oldremote, const address_v* newremote 101 ); 99 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 100 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote); 102 101 103 virtual void onLinkFail(const LinkID& id, const address_v* local,104 const address_v*remote);102 virtual void onLinkFail(const LinkID& id, 103 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 105 104 106 virtual void onLinkQoSChanged(const LinkID& id, const address_v* local, 107 const address_v* remote, const QoSParameterSet& qos); 105 virtual void onLinkQoSChanged(const LinkID& id, 106 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 107 const QoSParameterSet& qos); 108 108 }; 109 109 -
source/ariba/communication/EndpointDescriptor.cpp
r5624 r12060 42 42 namespace communication { 43 43 44 vsznDefault(EndpointDescriptor);44 //vsznDefault(EndpointDescriptor); 45 45 46 EndpointDescriptor::EndpointDescriptor(){ 46 EndpointDescriptor::EndpointDescriptor() : 47 endpoints(new addressing2::endpoint_set()) 48 { 47 49 } 48 50 … … 55 57 } 56 58 57 EndpointDescriptor::EndpointDescriptor(const endpoint_set& endpoints ) : 58 endpoints(endpoints){ 59 EndpointDescriptor::EndpointDescriptor( 60 const PeerID& peer_id, 61 addressing2::EndpointSetPtr endpoints ) : 62 peerId(peer_id), 63 endpoints(endpoints) 64 { 59 65 } 60 66 61 EndpointDescriptor::EndpointDescriptor(const string& str) : endpoints(str){ 67 EndpointDescriptor::EndpointDescriptor(const string& str) : 68 endpoints(new addressing2::endpoint_set()) 69 { 70 cout << "ERROR: Construction of EndpointDescriptor from String is not functional!!" << endl; 71 assert( false ); 62 72 } 63 73 74 75 reboost::message_t EndpointDescriptor::serialize() const 76 { 77 reboost::message_t msg; 78 msg.push_back(peerId.serialize()); 79 msg.push_back(endpoints->serialize()); 80 81 return msg; 82 } 83 84 reboost::shared_buffer_t EndpointDescriptor::deserialize(reboost::shared_buffer_t buff) 85 { 86 buff = peerId.deserialize(buff); 87 buff = endpoints->deserialize(buff); 88 89 return buff; 90 } 91 92 64 93 }} // namespace ariba, communication -
source/ariba/communication/EndpointDescriptor.h
r7744 r12060 42 42 #include <string> 43 43 #include <set> 44 #include "ariba/utility/serialization.h"44 //#include "ariba/utility/serialization.h" 45 45 #include "ariba/utility/types/PeerID.h" 46 #include "ariba/utility/addressing/endpoint_set.hpp" 46 47 #include "ariba/utility/addressing2/endpoint_set.hpp" 48 49 // reboost messages 50 #include "ariba/utility/transport/messages/message.hpp" 51 47 52 48 53 namespace ariba { … … 51 56 using_serialization; 52 57 using namespace std; 53 using namespace ariba::addressing;54 58 using ariba::utility::PeerID; 55 59 56 class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE 57 friend class BaseCommunication; 60 61 /** 62 * This class is used a transitions helper between the old addressing and 63 * serialization to the new addressing2 and the new message classes 64 * 65 * Maybe it will be replaced, or at least modified in the future. 66 */ 67 //class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE 68 // friend class BaseCommunication; 69 class EndpointDescriptor 70 { 71 friend class BaseCommunication; 58 72 59 73 public: … … 68 82 69 83 /// construct end-points from an endpoint set 70 EndpointDescriptor(const endpoint_set&endpoints );84 EndpointDescriptor(const PeerID& peer_id, addressing2::EndpointSetPtr endpoints ); 71 85 86 // FIXME NOT WORKING !! 72 87 /// construct end-points from a string 73 88 EndpointDescriptor(const string& str); … … 75 90 /// convert end-points to string 76 91 string toString() const { 77 return endpoints .to_string();92 return endpoints->to_string(); 78 93 } 79 94 … … 90 105 } 91 106 92 /// create endpoint93 static EndpointDescriptor* fromString(string str) {94 return new EndpointDescriptor(str);95 }107 // /// create endpoint 108 // static EndpointDescriptor* fromString(string str) { 109 // return new EndpointDescriptor(str); 110 // } 96 111 97 112 bool operator==(const EndpointDescriptor& rh) const { … … 113 128 114 129 /// returns the end-points of this descriptor 115 endpoint_set& getEndpoints(){130 addressing2::const_EndpointSetPtr getEndpoints() const { 116 131 return endpoints; 117 132 } 118 119 /// returns the end-points of this descriptor120 const endpoint_set& getEndpoints() const{121 returnendpoints;133 134 void replace_endpoint_set(addressing2::EndpointSetPtr new_endpoints) 135 { 136 endpoints = new_endpoints; 122 137 } 123 138 124 139 /// returns a reference to the peer id 125 140 PeerID& getPeerId() { … … 132 147 return peerId; 133 148 } 149 150 /// returns a message with peerId and endpoints in it 151 reboost::message_t serialize() const; 152 153 /// deserialite peerId and endpoints 154 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff); 155 134 156 private: 135 endpoint_setendpoints;157 addressing2::EndpointSetPtr endpoints; 136 158 PeerID peerId; 137 159 }; … … 139 161 }} // namespace ariba, communication 140 162 141 sznBeginDefault( ariba::communication::EndpointDescriptor, X ){ 142 143 // serialize peer id 144 X && &peerId; 145 146 // serialize end-points 147 uint16_t len = endpoints.to_bytes_size(); 148 X && len; 149 uint8_t* buffer = X.bytes( len ); 150 if (buffer!=NULL) { 151 if (X.isDeserializer()) endpoints.assign(buffer,len); 152 else endpoints.to_bytes(buffer); 153 } 154 }sznEnd(); 163 //sznBeginDefault( ariba::communication::EndpointDescriptor, X ){ 164 // 165 // // TODO 166 // assert(false); 167 // 168 // // serialize peer id 169 // X && &peerId; 170 // 171 // // serialize end-points 172 // uint16_t len = endpoints.to_bytes_size(); 173 // X && len; 174 // uint8_t* buffer = X.bytes( len ); 175 // if (buffer!=NULL) { 176 // if (X.isDeserializer()) endpoints.assign(buffer,len); 177 // else endpoints.to_bytes(buffer); 178 // } 179 //}sznEnd(); 155 180 156 181 #endif /*ENDPOINTDESCRIPTOR_H_*/ -
source/ariba/communication/messages/AribaBaseMsg.h
r9694 r12060 42 42 #include <string> 43 43 #include <boost/cstdint.hpp> 44 #include "ariba/utility/messages.h" 44 //#include "ariba/utility/messages.h" 45 #include "ariba/utility/messages/Message.h" 45 46 #include "ariba/utility/serialization.h" 46 47 #include "ariba/utility/types/LinkID.h" … … 61 62 using_serialization; 62 63 64 // XXX This whole message is DEPRECATED 63 65 class AribaBaseMsg : public Message { 64 66 VSERIALIZEABLE; … … 69 71 typeLinkReply = 2, 70 72 typeLinkClose = 3, 71 typeLinkUpdate = 4 73 typeLinkUpdate = 4, 74 typeDirectData = 5 72 75 }; 73 76 … … 115 118 116 119 sznBeginDefault( ariba::communication::AribaBaseMsg, X ) { 117 X && type && & localLink && &remoteLink;120 X && type && &remoteLink; 118 121 if (type == typeLinkReply || type == typeLinkRequest) 119 X && localDescriptor && remoteDescriptor;120 X && Payload();122 X && &localLink && localDescriptor && remoteDescriptor; 123 // X && Payload(); 121 124 } sznEnd(); 122 125 -
source/ariba/communication/messages/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(AribaBaseMsg.h)39 #add_headers(AribaBaseMsg.h) 40 40 41 add_sources(AribaBaseMsg.cpp)41 #add_sources(AribaBaseMsg.cpp) -
source/ariba/communication/networkinfo/AddressDiscovery.cpp
r10700 r12060 49 49 #include <ifaddrs.h> 50 50 51 #include <string> 52 #include <boost/asio/ip/address.hpp> 53 #include <boost/foreach.hpp> 54 55 #include "ariba/utility/addressing2/tcpip_endpoint.hpp" 56 #include "ariba/utility/addressing/mac_address.hpp" 57 51 58 #ifdef HAVE_LIBBLUETOOTH 52 59 #include <bluetooth/bluetooth.h> … … 58 65 namespace communication { 59 66 60 mac_address AddressDiscovery::getMacFromIF( const char* name ) { 67 68 using namespace std; 69 using namespace addressing2; 70 using namespace boost::asio::ip; 71 72 using ariba::addressing::mac_address; 73 74 mac_address getMacFromIF( const char* name ) 75 { 61 76 mac_address addr; 62 77 #ifdef HAVE_LIBBLUETOOTH … … 73 88 } 74 89 75 int AddressDiscovery::dev_info(int s, int dev_id, long arg) { 76 #ifdef HAVE_LIBBLUETOOTH 77 endpoint_set* set = (endpoint_set*)arg; 78 struct hci_dev_info di; 79 memset(&di, 0, sizeof(struct hci_dev_info)); 80 di.dev_id = dev_id; 81 if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0; 82 mac_address mac; 83 mac.bluetooth( di.bdaddr ); 84 address_vf vf = mac; 85 set->add(vf); 90 int dev_info(int s, int dev_id, long arg) 91 { 92 #ifdef HAVE_LIBBLUETOOTH 93 // endpoint_set* set = (endpoint_set*)arg; 94 // struct hci_dev_info di; 95 // memset(&di, 0, sizeof(struct hci_dev_info)); 96 // di.dev_id = dev_id; 97 // if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0; 98 // mac_address mac; 99 // mac.bluetooth( di.bdaddr ); 100 // address_vf vf = mac; 101 // set->add(vf); 86 102 #endif 87 103 return 0; 88 104 } 89 105 90 void AddressDiscovery::discover_bluetooth( endpoint_set& endpoints ) { 91 #ifdef HAVE_LIBBLUETOOTH 92 hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints ); 93 #endif 94 } 95 96 void AddressDiscovery::discover_ip_addresses( endpoint_set& endpoints ) { 97 struct ifaddrs* ifaceBuffer = NULL; 98 void* tmpAddrPtr = NULL; 99 100 int ret = getifaddrs( &ifaceBuffer ); 101 if( ret != 0 ) return; 102 103 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) { 104 105 // ignore devices that are disabled or have no ip 106 if(i == NULL) continue; 107 struct sockaddr* addr = i->ifa_addr; 108 if (addr==NULL) continue; 109 110 // ignore tun devices 111 string device = string(i->ifa_name); 112 if(device.find_first_of("tun") == 0) continue; 113 114 if (addr->sa_family == AF_INET) { 115 // look for ipv4 116 char straddr[INET_ADDRSTRLEN]; 117 tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr; 118 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 119 ip_address ip = straddr; 120 if (ip.is_loopback()) continue; 121 address_vf vf = ip; 122 endpoints.add( vf ); 123 } else 124 if (addr->sa_family == AF_INET6) { 125 // look for ipv6 126 char straddr[INET6_ADDRSTRLEN]; 127 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 128 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 129 ip_address ip = straddr; 130 if (ip.is_loopback()) continue; 131 // if (ip.is_link_local()) continue; 132 address_vf vf = ip; 133 endpoints.add( vf ); 134 } 135 } 136 137 freeifaddrs(ifaceBuffer); 138 } 139 140 void AddressDiscovery::discover_endpoints( endpoint_set& endpoints ) { 141 discover_ip_addresses( endpoints ); 142 discover_bluetooth( endpoints ); 106 void discover_bluetooth( 107 EndpointSetPtr listenOn_endpoints, 108 EndpointSetPtr discovered_endpoints ) 109 { 110 #ifdef HAVE_LIBBLUETOOTH 111 // FIXME aktuell bluetooth 112 // hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints ); 113 #endif 114 } 115 116 void discover_ip_addresses( 117 EndpointSetPtr listenOn_endpoints, 118 EndpointSetPtr discovered_endpoints ) 119 { 120 bool discover_ipv4 = false; 121 bool discover_ipv6 = false; 122 vector<uint16_t> ipv4_ports; 123 vector<uint16_t> ipv6_ports; 124 125 /* analyze listenOn_endpoints */ 126 BOOST_FOREACH( TcpIP_EndpointPtr endp, listenOn_endpoints->get_tcpip_endpoints() ) 127 { 128 // BRANCH: IPv4 any [0.0.0.0] 129 if ( endp->to_asio().address() == address_v4::any() ) 130 { 131 // add port 132 ipv4_ports.push_back(endp->to_asio().port()); 133 134 discover_ipv4 = true; 135 } 136 137 // BRANCH: IPv6 any [::] 138 else if ( endp->to_asio().address() == address_v6::any() ) 139 { 140 // add port 141 ipv6_ports.push_back(endp->to_asio().port()); 142 143 discover_ipv6 = true; 144 145 146 // NOTE: on linux the ipv6-any address [::] catches ipv4 as well 147 ipv4_ports.push_back(endp->to_asio().port()); 148 discover_ipv4 = true; 149 } 150 151 // BRANCH: explicit ip address 152 else 153 { 154 // ---> don't discover anything, just add it directly 155 discovered_endpoints->add_endpoint(endp); 156 } 157 } 158 159 160 /* discover addresses */ 161 if ( discover_ipv4 || discover_ipv6 ) 162 { 163 struct ifaddrs* ifaceBuffer = NULL; 164 void* tmpAddrPtr = NULL; 165 166 int ret = getifaddrs( &ifaceBuffer ); 167 if( ret != 0 ) return; 168 169 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) 170 { 171 // ignore devices that are disabled or have no ip 172 if(i == NULL) continue; 173 struct sockaddr* addr = i->ifa_addr; 174 if (addr==NULL) continue; 175 176 // // ignore tun devices // XXX why? 177 // string device = string(i->ifa_name); 178 // if(device.find_first_of("tun") == 0) continue; 179 180 // IPv4 181 if ( discover_ipv4 && addr->sa_family == AF_INET ) 182 { 183 char straddr[INET_ADDRSTRLEN]; 184 tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr; 185 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 186 187 address ip_addr = address::from_string(straddr); 188 189 // skip loopback address 190 if ( ip_addr.to_v4() == address_v4::loopback() ) 191 continue; 192 193 // add endpoint for this address and every given ipv4 port 194 BOOST_FOREACH( uint16_t port, ipv4_ports ) 195 { 196 tcp::endpoint tcpip_endp(ip_addr, port); 197 TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp)); 198 199 discovered_endpoints->add_endpoint(endp); 200 } 201 } 202 203 // IPv6 204 else if ( discover_ipv6 && addr->sa_family == AF_INET6 ) 205 { 206 // look for ipv6 207 char straddr[INET6_ADDRSTRLEN]; 208 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 209 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 210 211 address ip_addr = address::from_string(straddr); 212 213 // skip loopback address 214 if ( ip_addr.to_v6() == address_v6::loopback() ) 215 continue; 216 217 // add endpoint for this address and every given ipv4 port 218 BOOST_FOREACH( uint16_t port, ipv6_ports ) 219 { 220 tcp::endpoint tcpip_endp(ip_addr, port); 221 TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp)); 222 223 discovered_endpoints->add_endpoint(endp); 224 } 225 } 226 } 227 228 freeifaddrs(ifaceBuffer); 229 } 230 } 231 232 233 234 EndpointSetPtr AddressDiscovery::discover_endpoints(EndpointSetPtr listenOn_endpoints) 235 { 236 EndpointSetPtr discovered_endpoints(new addressing2::endpoint_set()); 237 238 discover_ip_addresses( listenOn_endpoints, discovered_endpoints ); 239 discover_bluetooth( listenOn_endpoints, discovered_endpoints ); 240 241 return discovered_endpoints; 143 242 } 144 243 -
source/ariba/communication/networkinfo/AddressDiscovery.h
r5639 r12060 40 40 #define __ADDRESS_DISCOVERY_H 41 41 42 #include "ariba/utility/addressing/addressing.hpp" 43 44 using namespace ariba::addressing; 42 #include "ariba/utility/addressing2/endpoint_set.hpp" 45 43 46 44 namespace ariba { 47 45 namespace communication { 48 46 47 using addressing2::EndpointSetPtr; 48 49 49 class AddressDiscovery { 50 50 public: 51 static void discover_endpoints( endpoint_set& endpoints);51 static EndpointSetPtr discover_endpoints(EndpointSetPtr listenOn_endpoints); 52 52 53 53 private: 54 static mac_address getMacFromIF( const char* name ); 55 static int dev_info(int s, int dev_id, long arg); 56 static void discover_bluetooth( endpoint_set& endpoints ); 57 static void discover_ip_addresses( endpoint_set& endpoints ); 54 // TODO aktuell weg damit.. 55 // static mac_address getMacFromIF( const char* name ); 56 // static int dev_info(int s, int dev_id, long arg); 57 // static void discover_bluetooth( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints ); 58 // static void discover_ip_addresses( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints ); 58 59 }; 59 60 -
source/ariba/overlay/BaseOverlay.cpp
r10653 r12060 57 57 #include "ariba/utility/visual/DddVis.h" 58 58 #include "ariba/utility/visual/ServerVis.h" 59 #include <ariba/utility/misc/sha1.h> 59 60 60 61 namespace ariba { 61 62 namespace overlay { 63 64 using namespace std; 65 using ariba::transport::system_priority; 62 66 63 67 #define visualInstance ariba::utility::DddVis::instance() 64 68 #define visualIdOverlay ariba::utility::ServerVis::NETWORK_ID_BASE_OVERLAY 65 69 #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION 70 71 72 // time constants (in seconds) 73 #define KEEP_ALIVE_TIME 60 // send keep-alive message after link is not used for #s 74 75 #define LINK_ESTABLISH_TIME_OUT 10 // timeout: link requested but not up 76 #define KEEP_ALIVE_TIME_OUT KEEP_ALIVE_TIME + LINK_ESTABLISH_TIME_OUT // timeout: no data received on this link (incl. keep-alive messages) 77 #define AUTO_LINK_TIME_OUT KEEP_ALIVE_TIME_OUT // timeout: auto link not used for #s 66 78 67 79 … … 85 97 86 98 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { 87 BOOST_FOREACH( LinkDescriptor* lp, links )99 foreach( LinkDescriptor* lp, links ) 88 100 if ((communication ? lp->communicationId : lp->overlayId) == link) 89 101 return lp; … … 92 104 93 105 const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const { 94 BOOST_FOREACH( const LinkDescriptor* lp, links )106 foreach( const LinkDescriptor* lp, links ) 95 107 if ((communication ? lp->communicationId : lp->overlayId) == link) 96 108 return lp; … … 122 134 123 135 /// returns a auto-link descriptor 124 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) { 136 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) 137 { 125 138 // search for a descriptor that is already up 126 BOOST_FOREACH( LinkDescriptor* lp, links ) 127 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->up && lp->keepAliveMissed == 0) 128 return lp; 139 foreach( LinkDescriptor* lp, links ) 140 { 141 if (lp->autolink && lp->remoteNode == node && lp->service == service && isLinkVital(lp) ) 142 return lp; 143 } 144 129 145 // if not found, search for one that is about to come up... 130 BOOST_FOREACH( LinkDescriptor* lp, links ) 131 if (lp->autolink && lp->remoteNode == node && lp->service == service && lp->keepAliveMissed == 0 ) 132 return lp; 146 foreach( LinkDescriptor* lp, links ) 147 { 148 time_t now = time(NULL); 149 150 if (lp->autolink && lp->remoteNode == node && lp->service == service 151 && difftime( now, lp->keepAliveReceived ) <= LINK_ESTABLISH_TIME_OUT ) 152 return lp; 153 } 154 133 155 return NULL; 134 156 } … … 136 158 /// stabilizes link information 137 159 void BaseOverlay::stabilizeLinks() { 138 // send keep-alive messages over established links 139 BOOST_FOREACH( LinkDescriptor* ld, links ) { 160 time_t now = time(NULL); 161 162 // send keep-alive messages over established links 163 foreach( LinkDescriptor* ld, links ) 164 { 140 165 if (!ld->up) continue; 141 OverlayMsg msg( OverlayMsg::typeLinkAlive, 142 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 143 if (ld->relayed) msg.setRouteRecord(true); 144 send_link( &msg, ld->overlayId ); 166 167 if ( difftime( now, ld->keepAliveSent ) >= KEEP_ALIVE_TIME ) 168 { 169 logging_debug("[BaseOverlay] Sending KeepAlive over " 170 << ld->to_string() 171 << " after " 172 << difftime( now, ld->keepAliveSent ) 173 << "s"); 174 175 OverlayMsg msg( OverlayMsg::typeKeepAlive, 176 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 177 msg.setRouteRecord(true); 178 ld->keepAliveSent = now; 179 send_link( &msg, ld->overlayId, system_priority::OVERLAY ); 180 } 145 181 } 146 182 147 183 // iterate over all links and check for time boundaries 148 184 vector<LinkDescriptor*> oldlinks; 149 time_t now = time(NULL); 150 BOOST_FOREACH( LinkDescriptor* ld, links ) { 151 152 // keep alives and not up? yes-> link connection request is stale! 153 if ( !ld->up && difftime( now, ld->keepAliveTime ) >= 2 ) { 154 155 // increase counter 156 ld->keepAliveMissed++; 157 158 // missed more than four keep-alive messages (10 sec)? -> drop link 159 if (ld->keepAliveMissed > 4) { 160 logging_info( "Link connection request is stale, closing: " << ld ); 161 oldlinks.push_back( ld ); 162 continue; 163 } 185 foreach( LinkDescriptor* ld, links ) { 186 187 // link connection request stale? 188 if ( !ld->up && difftime( now, ld->keepAliveReceived ) >= LINK_ESTABLISH_TIME_OUT ) // NOTE: keepAliveReceived == now, on connection request 189 { 190 logging_info( "Link connection request is stale, closing: " << ld ); 191 ld->failed = true; 192 oldlinks.push_back( ld ); 193 continue; 164 194 } 165 195 166 196 if (!ld->up) continue; 167 197 198 199 200 168 201 // check if link is relayed and retry connecting directly 202 // TODO Mario: What happens here? --> There are 3 attempts to replace a relayed link with a direct one. see: handleLinkReply 169 203 if ( ld->relayed && !ld->communicationUp && ld->retryCounter > 0) { 170 204 ld->retryCounter--; … … 173 207 174 208 // remote used as relay flag 175 if ( ld->relaying && difftime( now, ld->timeRelaying ) > 10)209 if ( ld->relaying && difftime( now, ld->timeRelaying ) > KEEP_ALIVE_TIME_OUT) // TODO is this a reasonable timeout ?? 176 210 ld->relaying = false; 177 211 … … 183 217 184 218 // auto-link time exceeded? 185 if ( ld->autolink && difftime( now, ld->lastuse ) > 30) {219 if ( ld->autolink && difftime( now, ld->lastuse ) > AUTO_LINK_TIME_OUT ) { 186 220 oldlinks.push_back( ld ); 187 221 continue; … … 189 223 190 224 // keep alives missed? yes-> 191 if ( difftime( now, ld->keepAliveTime ) > 4 ) { 192 193 // increase counter 194 ld->keepAliveMissed++; 195 196 // missed more than four keep-alive messages (4 sec)? -> drop link 197 if (ld->keepAliveMissed >= 2) { 198 logging_info( "Link is stale, closing: " << ld ); 199 oldlinks.push_back( ld ); 200 continue; 201 } 225 if ( difftime( now, ld->keepAliveReceived ) >= KEEP_ALIVE_TIME_OUT ) 226 { 227 logging_info( "Link is stale, closing: " << ld ); 228 ld->failed = true; 229 oldlinks.push_back( ld ); 230 continue; 202 231 } 203 232 } 204 233 205 234 // drop links 206 BOOST_FOREACH( LinkDescriptor* ld, oldlinks ) {235 foreach( LinkDescriptor* ld, oldlinks ) { 207 236 logging_info( "Link timed out. Dropping " << ld ); 208 237 ld->relaying = false; … … 210 239 } 211 240 212 // show link state 213 counter++; 214 if (counter>=4) showLinks(); 215 if (counter>=4 || counter<0) counter = 0; 241 242 243 244 // show link state (debug output) 245 if (counter>=10 || counter<0) 246 { 247 showLinks(); 248 counter = 0; 249 } 250 else 251 { 252 counter++; 253 } 216 254 } 217 255 … … 230 268 231 269 int i=0; 232 BOOST_FOREACH( LinkDescriptor* ld, links ) {233 if (! ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue;270 foreach( LinkDescriptor* ld, links ) { 271 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) continue; 234 272 bool found = false; 235 BOOST_FOREACH(NodeID& id, nodes)273 foreach(NodeID& id, nodes) 236 274 if (id == ld->remoteNode) found = true; 237 275 if (found) continue; … … 261 299 int i=0; 262 300 logging_info("--- link state -------------------------------"); 263 BOOST_FOREACH( LinkDescriptor* ld, links ) {301 foreach( LinkDescriptor* ld, links ) { 264 302 string epd = ""; 265 if (ld->isDirectVital()) 266 epd = getEndpointDescriptor(ld->remoteNode).toString(); 303 if (isLinkDirectVital(ld)) 304 { 305 // epd = getEndpointDescriptor(ld->remoteNode).toString(); 306 307 epd = "Connection: "; 308 epd += bc->get_local_endpoint_of_link(ld->communicationId)->to_string(); 309 epd += " <---> "; 310 epd += bc->get_remote_endpoint_of_link(ld->communicationId)->to_string(); 311 } 267 312 268 313 logging_info("LINK_STATE: " << i << ": " << ld << " " << epd); … … 289 334 // internal message delivery --------------------------------------------------- 290 335 336 337 seqnum_t BaseOverlay::send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority ) 338 { 339 // set priority 340 message->setPriority(priority); 341 342 // wrap old OverlayMsg into reboost message 343 reboost::message_t wrapped_message = message->wrap_up_for_sending(); 344 345 // send down to BaseCommunication 346 try 347 { 348 // * send * 349 return bc->sendMessage(bc_link, wrapped_message, priority, false); 350 } 351 catch ( communication::communication_message_not_sent& e ) 352 { 353 ostringstream out; 354 out << "Communication message not sent: " << e.what(); 355 throw message_not_sent(out.str()); 356 } 357 358 throw logic_error("This should never happen!"); 359 } 360 361 291 362 /// routes a message to its destination node 292 void BaseOverlay::route( OverlayMsg* message ) {293 363 void BaseOverlay::route( OverlayMsg* message, const NodeID& last_hop ) 364 { 294 365 // exceeded time-to-live? yes-> drop message 295 if (message->getNumHops() > message->getTimeToLive()) { 296 logging_warn("Message exceeded TTL. Dropping message and relay routes" 297 "for recovery."); 366 if (message->getNumHops() > message->getTimeToLive()) 367 { 368 logging_warn("Message exceeded TTL. Dropping message and relay routes " 369 << "for recovery. Hop count: " << (int) message->getNumHops()); 298 370 removeRelayNode(message->getDestinationNode()); 299 371 return; … … 301 373 302 374 // no-> forward message 303 else { 375 else 376 { 304 377 // destinastion myself? yes-> handle message 305 if (message->getDestinationNode() == nodeId) { 306 logging_warn("Usually I should not route messages to myself!"); 307 Message msg; 308 msg.encapsulate(message); 309 handleMessage( &msg, NULL ); 310 } else { 311 // no->send message to next hop 312 send( message, message->getDestinationNode() ); 313 } 314 } 378 if (message->getDestinationNode() == nodeId) 379 { 380 logging_warn("Usually I should not route messages to myself. And I won't!"); 381 } 382 383 // no->send message to next hop 384 else 385 { 386 try 387 { 388 /* (deep) packet inspection to determine priority */ 389 // BRANCH: typeData --> send with low priority 390 if ( message->getType() == OverlayMsg::typeData ) 391 { 392 // TODO think about implementing explicit routing queue (with active queue management??) 393 send( message, 394 message->getDestinationNode(), 395 message->getPriority(), 396 last_hop ); 397 } 398 // BRANCH: internal message --> send with higher priority 399 else 400 { 401 send( message, 402 message->getDestinationNode(), 403 system_priority::HIGH, 404 last_hop ); 405 } 406 } 407 catch ( message_not_sent& e ) 408 { 409 logging_warn("Unable to route message of type " 410 << message->getType() 411 << " to " 412 << message->getDestinationNode() 413 << ". Reason: " 414 << e.what()); 415 416 // inform sender 417 if ( message->getType() != OverlayMsg::typeMessageLost ) 418 { 419 report_lost_message(message); 420 } 421 } 422 } 423 } 424 } 425 426 void BaseOverlay::report_lost_message( const OverlayMsg* message ) 427 { 428 OverlayMsg reply(OverlayMsg::typeMessageLost); 429 reply.setSeqNum(message->getSeqNum()); 430 431 /** 432 * MessageLost-Message 433 * 434 * - Type of lost message 435 * - Hop count of lost message 436 * - Source-LinkID of lost message 437 */ 438 reboost::shared_buffer_t b(sizeof(uint8_t)*2); 439 b.mutable_data()[0] = message->getType(); 440 b.mutable_data()[1] = message->getNumHops(); 441 reply.append_buffer(b); 442 reply.append_buffer(message->getSourceLink().serialize()); 443 444 try 445 { 446 send_node(&reply, message->getSourceNode(), 447 system_priority::OVERLAY, 448 OverlayInterface::OVERLAY_SERVICE_ID); 449 } 450 catch ( message_not_sent& e ) 451 { 452 logging_warn("Tried to inform another node that we could'n route their message. But we were not able to send this error-message, too."); 453 } 315 454 } 316 455 317 456 /// sends a message to another node, delivers it to the base overlay class 318 seqnum_t BaseOverlay::send( OverlayMsg* message, const NodeID& destination ) { 457 seqnum_t BaseOverlay::send( OverlayMsg* message, 458 const NodeID& destination, 459 uint8_t priority, 460 const NodeID& last_hop ) throw(message_not_sent) 461 { 319 462 LinkDescriptor* next_link = NULL; 320 463 321 464 // drop messages to unspecified destinations 322 if (destination.isUnspecified()) return -1; 323 324 // send messages to myself -> handle message and drop warning! 325 if (destination == nodeId) { 326 logging_warn("Sent message to myself. Handling message.") 327 Message msg; 328 msg.encapsulate(message); 329 handleMessage( &msg, NULL ); 330 return -1; 465 if (destination.isUnspecified()) 466 throw message_not_sent("No destination specified. Drop!"); 467 468 // send messages to myself -> drop! 469 // TODO maybe this is not what we want. why not just deliver this message? 470 // There is a similar check in the route function, there it should be okay. 471 if (destination == nodeId) 472 { 473 logging_warn("Sent message to myself. Drop!"); 474 475 throw message_not_sent("Sent message to myself. Drop!"); 331 476 } 332 477 333 478 // use relay path? 334 if (message->isRelayed()) { 479 if (message->isRelayed()) 480 { 335 481 next_link = getRelayLinkTo( destination ); 336 if (next_link != NULL) { 482 483 if (next_link != NULL) 484 { 337 485 next_link->setRelaying(); 338 return bc->sendMessage(next_link->communicationId, message); 339 } else { 340 logging_warn("Could not send message. No relay hop found to " 341 << destination << " -- trying to route over overlay paths ...") 342 // logging_error("ERROR: " << debugInformation() ); 343 // return -1; 344 } 345 } 346 486 487 // * send message over relayed link * 488 return send_overlaymessage_down(message, next_link->communicationId, priority); 489 } 490 else 491 { 492 logging_warn("No relay hop found to " << destination 493 << " -- trying to route over overlay paths ...") 494 } 495 } 496 497 347 498 // last resort -> route over overlay path 348 499 LinkID next_id = overlayInterface->getNextLinkId( destination ); 349 if (next_id.isUnspecified()) { 350 logging_warn("Could not send message. No next hop found to " << 351 destination ); 352 logging_error("ERROR: " << debugInformation() ); 353 return -1; 354 } 355 356 // get link descriptor, up and running? yes-> send message 500 if ( next_id.isUnspecified() ) 501 { 502 // apperently we're the closest node --> try second best node 503 // NOTE: This is helpful if our routing table is not up-to-date, but 504 // may lead to circles. So we have to be careful. 505 std::vector<const LinkID*> next_ids = 506 overlayInterface->getSortedLinkIdsTowardsNode( destination ); 507 508 for ( int i = 0; i < next_ids.size(); i++ ) 509 { 510 const LinkID& link = *next_ids[i]; 511 512 if ( ! link.isUnspecified() ) 513 { 514 next_id = link; 515 516 break; 517 } 518 } 519 520 // still no next hop found. drop. 521 if ( next_id.isUnspecified() ) 522 { 523 logging_warn("Could not send message. No next hop found to " << 524 destination ); 525 logging_error("ERROR: " << debugInformation() ); 526 527 throw message_not_sent("No next hop found."); 528 } 529 } 530 531 532 /* get link descriptor, do some checks and send message */ 357 533 next_link = getDescriptor(next_id); 358 if (next_link != NULL && next_link->up) { 359 // send message over relayed link 360 return send(message, next_link); 361 } 362 363 // no-> error, dropping message 364 else { 365 logging_warn("Could not send message. Link not known or up"); 366 logging_error("ERROR: " << debugInformation() ); 367 return -1; 368 } 369 370 // not reached-> fail 371 return -1; 372 } 534 535 // check pointer 536 if ( next_link == NULL ) 537 { 538 // NOTE: this shuldn't happen 539 throw message_not_sent("Could not send message. Link not known."); 540 } 541 542 // avoid circles 543 if ( next_link->remoteNode == last_hop ) 544 { 545 // XXX logging_debug 546 logging_info("Possible next hop would create a circle: " 547 << next_link->remoteNode); 548 549 throw message_not_sent("Could not send message. Possible next hop would create a circle."); 550 } 551 552 // check if link is up 553 if ( ! next_link->up) 554 { 555 logging_warn("Could not send message. Link not up"); 556 logging_error("ERROR: " << debugInformation() ); 557 558 throw message_not_sent("Could not send message. Link not up"); 559 } 560 561 // * send message over overlay link * 562 return send(message, next_link, priority); 563 } 564 373 565 374 566 /// send a message using a link descriptor, delivers it to the base overlay class 375 seqnum_t BaseOverlay::send( OverlayMsg* message, LinkDescriptor* ldr, bool ignore_down ) { 567 seqnum_t BaseOverlay::send( OverlayMsg* message, 568 LinkDescriptor* ldr, 569 uint8_t priority ) throw(message_not_sent) 570 { 376 571 // check if null 377 if (ldr == NULL) { 378 logging_error("Can not send message to " << message->getDestinationAddress()); 379 return -1; 572 if (ldr == NULL) 573 { 574 ostringstream out; 575 out << "Can not send message to " << message->getDestinationAddress(); 576 throw message_not_sent(out.str()); 380 577 } 381 578 382 579 // check if up 383 if ( !ldr->up && !ignore_down) {384 logging_error("Can not send message. Link not up:" << ldr );580 if ( !ldr->up ) 581 { 385 582 logging_error("DEBUG_INFO: " << debugInformation() ); 386 return -1; 387 } 388 LinkDescriptor* ld = NULL; 389 390 // handle relayed link 391 if (ldr->relayed) { 583 584 ostringstream out; 585 out << "Can not send message. Link not up:" << ldr->to_string(); 586 throw message_not_sent(out.str()); 587 } 588 589 LinkDescriptor* next_hop_ld = NULL; 590 591 // BRANCH: relayed link 592 if (ldr->relayed) 593 { 392 594 logging_debug("Resolving direct link for relayed link to " 393 595 << ldr->remoteNode); 394 ld = getRelayLinkTo( ldr->remoteNode ); 395 if (ld==NULL) { 396 logging_error("No relay path found to link " << ldr ); 596 597 next_hop_ld = getRelayLinkTo( ldr->remoteNode ); 598 599 if (next_hop_ld==NULL) 600 { 397 601 logging_error("DEBUG_INFO: " << debugInformation() ); 398 return -1; 399 } 400 ld->setRelaying(); 602 603 ostringstream out; 604 out << "No relay path found to link: " << ldr; 605 throw message_not_sent(out.str()); 606 } 607 608 next_hop_ld->setRelaying(); 401 609 message->setRelayed(true); 402 } else 403 ld = ldr; 404 405 // handle direct link 406 if (ld->communicationUp) { 407 logging_debug("send(): Sending message over direct link."); 408 return bc->sendMessage( ld->communicationId, message ); 409 } else { 410 logging_error("send(): Could not send message. " 411 "Not a relayed link and direct link is not up."); 412 return -1; 413 } 414 return -1; 610 } 611 // BRANCH: direct link 612 else 613 { 614 next_hop_ld = ldr; 615 } 616 617 618 // check next hop-link 619 if ( ! next_hop_ld->communicationUp) 620 { 621 throw message_not_sent( "send(): Could not send message." 622 " Not a relayed link and direct link is not up." ); 623 } 624 625 // send over next link 626 logging_debug("send(): Sending message over direct link."); 627 return send_overlaymessage_down(message, next_hop_ld->communicationId, priority); 628 415 629 } 416 630 417 631 seqnum_t BaseOverlay::send_node( OverlayMsg* message, const NodeID& remote, 418 const ServiceID& service) { 632 uint8_t priority, const ServiceID& service) throw(message_not_sent) 633 { 419 634 message->setSourceNode(nodeId); 420 635 message->setDestinationNode(remote); 421 636 message->setService(service); 422 return send( message, remote ); 423 } 424 425 seqnum_t BaseOverlay::send_link( OverlayMsg* message, const LinkID& link,bool ignore_down ) { 637 return send( message, remote, priority ); 638 } 639 640 void BaseOverlay::send_link( OverlayMsg* message, 641 const LinkID& link, 642 uint8_t priority ) throw(message_not_sent) 643 { 426 644 LinkDescriptor* ld = getDescriptor(link); 427 if (ld==NULL) { 428 logging_error("Cannot find descriptor to link id=" << link.toString()); 429 return -1; 430 } 645 if (ld==NULL) 646 { 647 throw message_not_sent("Cannot find descriptor to link id=" + link.toString()); 648 } 649 431 650 message->setSourceNode(nodeId); 432 651 message->setDestinationNode(ld->remoteNode); … … 437 656 message->setService(ld->service); 438 657 message->setRelayed(ld->relayed); 439 return send( message, ld, ignore_down ); 658 659 660 try 661 { 662 // * send message * 663 send( message, ld, priority ); 664 } 665 catch ( message_not_sent& e ) 666 { 667 // drop failed link 668 ld->failed = true; 669 dropLink(ld->overlayId); 670 } 440 671 } 441 672 … … 451 682 // relay link still used and alive? 452 683 if (ld==NULL 453 || !ld->isDirectVital() 454 || difftime(route.used, time(NULL)) > 8) { 684 || !isLinkDirectVital(ld) 685 || difftime(route.used, time(NULL)) > KEEP_ALIVE_TIME_OUT) // TODO this was set to 8 before.. Is the new timeout better? 686 { 455 687 logging_info("Forgetting relay information to node " 456 688 << route.node.toString() ); … … 488 720 if (message->isRelayed()) { 489 721 // try to find source node 490 BOOST_FOREACH( relay_route& route, relay_routes ) {722 foreach( relay_route& route, relay_routes ) { 491 723 // relay route found? yes-> 492 724 if ( route.node == message->getDestinationNode() ) { … … 504 736 505 737 // try to find source node 506 BOOST_FOREACH( relay_route& route, relay_routes ) {738 foreach( relay_route& route, relay_routes ) { 507 739 508 740 // relay route found? yes-> … … 516 748 if (route.hops > message->getNumHops() 517 749 || rld == NULL 518 || ! rld->isDirectVital()) {750 || !isLinkDirectVital(ld)) { 519 751 logging_info("Updating relay information to node " 520 752 << route.node.toString() 521 << " reducing to " << message->getNumHops() << " hops.");753 << " reducing to " << (int) message->getNumHops() << " hops."); 522 754 route.hops = message->getNumHops(); 523 755 route.link = ld->overlayId; … … 542 774 LinkDescriptor* BaseOverlay::getRelayLinkTo( const NodeID& remote ) { 543 775 // try to find source node 544 BOOST_FOREACH( relay_route& route, relay_routes ) {776 foreach( relay_route& route, relay_routes ) { 545 777 if (route.node == remote ) { 546 778 LinkDescriptor* ld = getDescriptor( route.link ); 547 if (ld==NULL || ! ld->isDirectVital()) return NULL; else {779 if (ld==NULL || !isLinkDirectVital(ld)) return NULL; else { 548 780 route.used = time(NULL); 549 781 return ld; … … 575 807 // ---------------------------------------------------------------------------- 576 808 577 void BaseOverlay::start( BaseCommunication &_basecomm, const NodeID& _nodeid ) {809 void BaseOverlay::start( BaseCommunication* _basecomm, const NodeID& _nodeid ) { 578 810 logging_info("Starting..."); 579 811 580 812 // set parameters 581 bc = &_basecomm;813 bc = _basecomm; 582 814 nodeId = _nodeid; 583 815 … … 587 819 588 820 // timer for auto link management 589 Timer::setInterval( 1000 ); 821 Timer::setInterval( 1000 ); // XXX 822 // Timer::setInterval( 10000 ); 590 823 Timer::start(); 591 824 … … 641 874 overlayInterface->joinOverlay(); 642 875 state = BaseOverlayStateCompleted; 643 BOOST_FOREACH( NodeListener* i, nodeListeners )876 foreach( NodeListener* i, nodeListeners ) 644 877 i->onJoinCompleted( spovnetId ); 645 878 … … 682 915 // gather all service links 683 916 vector<LinkID> servicelinks; 684 BOOST_FOREACH( LinkDescriptor* ld, links ) { 917 foreach( LinkDescriptor* ld, links ) 918 { 685 919 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID ) 686 920 servicelinks.push_back( ld->overlayId ); … … 688 922 689 923 // drop all service links 690 BOOST_FOREACH( LinkID lnk, servicelinks ) 691 dropLink( lnk ); 924 foreach( LinkID lnk, servicelinks ) 925 { 926 logging_debug("Dropping service link " << lnk.toString()); 927 dropLink( lnk ); 928 } 692 929 693 930 // let the node leave the spovnet overlay interface 694 931 logging_debug( "Leaving overlay" ); 695 932 if( overlayInterface != NULL ) 933 { 696 934 overlayInterface->leaveOverlay(); 935 } 697 936 698 937 // drop still open bootstrap links 699 BOOST_FOREACH( LinkID lnk, bootstrapLinks ) 700 bc->dropLink( lnk ); 938 foreach( LinkID lnk, bootstrapLinks ) 939 { 940 logging_debug("Dropping bootstrap link " << lnk.toString()); 941 bc->dropLink( lnk ); 942 } 701 943 702 944 // change to inalid state … … 708 950 709 951 // inform all registered services of the event 710 BOOST_FOREACH( NodeListener* i, nodeListeners ) { 952 foreach( NodeListener* i, nodeListeners ) 953 { 711 954 if( ret ) i->onLeaveCompleted( spovnetId ); 712 955 else i->onLeaveFailed( spovnetId ); … … 731 974 state = BaseOverlayStateInvalid; 732 975 733 BOOST_FOREACH( NodeListener* i, nodeListeners )976 foreach( NodeListener* i, nodeListeners ) 734 977 i->onJoinFailed( spovnetId ); 735 978 … … 783 1026 const ServiceID& service ) { 784 1027 1028 // TODO What if we already have a Link to this node and this service id? 1029 785 1030 // do not establish a link to myself! 786 if (remote == nodeId) return LinkID::UNSPECIFIED; 787 1031 if (remote == nodeId) return 1032 LinkID::UNSPECIFIED; 1033 1034 788 1035 // create a link descriptor 789 1036 LinkDescriptor* ld = addDescriptor(); … … 792 1039 ld->service = service; 793 1040 ld->listener = getListener(ld->service); 1041 1042 // initialize sequence numbers 1043 ld->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); 1044 logging_debug("Creating new link with initial SeqNum: " << ld->last_sent_seqnum); 794 1045 795 1046 // create link request message … … 800 1051 msg.setRelayed(true); 801 1052 msg.setRegisterRelay(true); 1053 // msg.setRouteRecord(true); 1054 1055 msg.setSeqNum(ld->last_sent_seqnum); 802 1056 803 1057 // debug message … … 809 1063 ); 810 1064 1065 811 1066 // sending message to node 812 send_node( &msg, ld->remoteNode, ld->service ); 813 1067 try 1068 { 1069 // * send * 1070 seqnum_t seq = send_node( &msg, ld->remoteNode, system_priority::OVERLAY, ld->service ); 1071 } 1072 catch ( message_not_sent& e ) 1073 { 1074 logging_warn("Link request not sent: " << e.what()); 1075 1076 // Message not sent. Cancel link request. 1077 SystemQueue::instance().scheduleCall( 1078 boost::bind( 1079 &BaseOverlay::__onLinkEstablishmentFailed, 1080 this, 1081 ld->overlayId) 1082 ); 1083 } 1084 814 1085 return ld->overlayId; 815 1086 } 816 1087 1088 /// NOTE: "id" is an Overlay-LinkID 1089 void BaseOverlay::__onLinkEstablishmentFailed(const LinkID& id) 1090 { 1091 // TODO This code redundant. But also it's not easy to aggregate in one function. 1092 1093 // get descriptor for link 1094 LinkDescriptor* ld = getDescriptor(id, false); 1095 if ( ld == NULL ) return; // not found? ->ignore! 1096 1097 logging_debug( "__onLinkEstablishmentFaild: " << ld ); 1098 1099 // removing relay link information 1100 removeRelayLink(ld->overlayId); 1101 1102 // inform listeners about link down 1103 ld->communicationUp = false; 1104 if (!ld->service.isUnspecified()) 1105 { 1106 CommunicationListener* lst = getListener(ld->service); 1107 if(lst != NULL) lst->onLinkFail( ld->overlayId, ld->remoteNode ); 1108 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1109 } 1110 1111 // delete all queued messages (auto links) 1112 if( ld->messageQueue.size() > 0 ) { 1113 logging_warn( "Dropping link " << id.toString() << " that has " 1114 << ld->messageQueue.size() << " waiting messages" ); 1115 ld->flushQueue(); 1116 } 1117 1118 // erase mapping 1119 eraseDescriptor(ld->overlayId); 1120 } 1121 1122 817 1123 /// drops an established link 818 void BaseOverlay::dropLink(const LinkID& link) { 819 logging_info( "Dropping link (initiated locally):" << link.toString() ); 1124 void BaseOverlay::dropLink(const LinkID& link) 1125 { 1126 logging_info( "Dropping link: " << link.toString() ); 820 1127 821 1128 // find the link item to drop 822 1129 LinkDescriptor* ld = getDescriptor(link); 823 if( ld == NULL ) { 1130 if( ld == NULL ) 1131 { 824 1132 logging_warn( "Can't drop link, link is unknown!"); 825 1133 return; … … 827 1135 828 1136 // delete all queued messages 829 if( ld->messageQueue.size() > 0 ) { 1137 if( ld->messageQueue.size() > 0 ) 1138 { 830 1139 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has " 831 1140 << ld->messageQueue.size() << " waiting messages" ); 832 1141 ld->flushQueue(); 833 1142 } 834 835 // inform sideport and listener 836 if(ld->listener != NULL) 837 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 838 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); 839 840 // do not drop relay links 841 if (!ld->relaying) { 842 // drop the link in base communication 843 if (ld->communicationUp) bc->dropLink( ld->communicationId ); 844 845 // erase descriptor 846 eraseDescriptor( ld->overlayId ); 847 } else { 848 ld->dropAfterRelaying = true; 849 } 1143 1144 1145 // inform application and remote note (but only once) 1146 // NOTE: If we initiated the drop, this function is called twice, but on 1147 // the second call, there is noting to do. 1148 if ( ld->up && ! ld->failed ) 1149 { 1150 // inform sideport and listener 1151 if(ld->listener != NULL) 1152 { 1153 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 1154 } 1155 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); 1156 1157 // send link-close to remote node 1158 logging_info("Sending LinkClose message to remote node."); 1159 OverlayMsg close_msg(OverlayMsg::typeLinkClose); 1160 send_link(&close_msg, link, system_priority::OVERLAY); 1161 1162 // deactivate link 1163 ld->up = false; 1164 // ld->closing = true; 1165 } 1166 1167 else if ( ld->failed ) 1168 { 1169 // inform listener 1170 if( ld->listener != NULL ) 1171 { 1172 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1173 } 1174 1175 ld->up = false; 1176 __removeDroppedLink(ld->overlayId); 1177 } 1178 } 1179 1180 /// called from typeLinkClose-handler 1181 void BaseOverlay::__removeDroppedLink(const LinkID& link) 1182 { 1183 // find the link item to drop 1184 LinkDescriptor* ld = getDescriptor(link); 1185 if( ld == NULL ) 1186 { 1187 return; 1188 } 1189 1190 // do not drop relay links 1191 if (!ld->relaying) 1192 { 1193 // drop the link in base communication 1194 if (ld->communicationUp) 1195 { 1196 bc->dropLink( ld->communicationId ); 1197 } 1198 1199 // erase descriptor 1200 eraseDescriptor( ld->overlayId ); 1201 } 1202 else 1203 { 1204 ld->dropAfterRelaying = true; 1205 } 850 1206 } 851 1207 … … 853 1209 854 1210 /// internal send message, always use this functions to send messages over links 855 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) { 1211 const SequenceNumber& BaseOverlay::sendMessage( reboost::message_t message, 1212 const LinkID& link, 1213 uint8_t priority ) throw(message_not_sent) 1214 { 856 1215 logging_debug( "Sending data message on link " << link.toString() ); 857 1216 858 1217 // get the mapping for this link 859 1218 LinkDescriptor* ld = getDescriptor(link); 860 if( ld == NULL ) { 861 logging_error("Could not send message. " 862 << "Link not found id=" << link.toString()); 863 return -1; 1219 if( ld == NULL ) 1220 { 1221 throw message_not_sent("Could not send message. Link not found id=" + link.toString()); 864 1222 } 865 1223 866 1224 // check if the link is up yet, if its an auto link queue message 867 if( !ld->up ) { 1225 if( !ld->up ) 1226 { 868 1227 ld->setAutoUsed(); 869 if( ld->autolink ) { 1228 if( ld->autolink ) 1229 { 870 1230 logging_info("Auto-link " << link.toString() << " not up, queue message"); 871 Data data = data_serialize( message ); 872 const_cast<Message*>(message)->dropPayload(); 873 ld->messageQueue.push_back( new Message(data) ); 874 } else { 875 logging_error("Link " << link.toString() << " not up, drop message"); 876 } 877 return -1; 878 } 879 880 // compile overlay message (has service and node id) 881 OverlayMsg overmsg( OverlayMsg::typeData ); 882 overmsg.encapsulate( const_cast<Message*>(message) ); 883 884 // send message over relay/direct/overlay 885 return send_link( &overmsg, ld->overlayId ); 886 } 887 888 889 seqnum_t BaseOverlay::sendMessage(const Message* message, 890 const NodeID& node, const ServiceID& service) { 1231 1232 // queue message 1233 LinkDescriptor::message_queue_entry msg; 1234 msg.message = message; 1235 msg.priority = priority; 1236 1237 ld->messageQueue.push_back( msg ); 1238 1239 return SequenceNumber::DISABLED; // TODO what to return if message is queued? 1240 } 1241 else 1242 { 1243 throw message_not_sent("Link " + link.toString() + " not up, drop message"); 1244 } 1245 } 1246 1247 // TODO AKTUELL: sequence numbers 1248 // TODO seqnum on fast path ? 1249 ld->last_sent_seqnum.increment(); 1250 1251 /* choose fast-path for direct links; normal overlay-path otherwise */ 1252 // BRANCH: direct link 1253 if ( ld->communicationUp && !ld->relayed ) 1254 { 1255 // * send down to BaseCommunication * 1256 try 1257 { 1258 bc->sendMessage(ld->communicationId, message, priority, true); 1259 } 1260 catch ( communication::communication_message_not_sent& e ) 1261 { 1262 ostringstream out; 1263 out << "Communication message on fast-path not sent: " << e.what(); 1264 throw message_not_sent(out.str()); 1265 } 1266 } 1267 1268 // BRANCH: use (slow) overlay-path 1269 else 1270 { 1271 // compile overlay message (has service and node id) 1272 OverlayMsg overmsg( OverlayMsg::typeData ); 1273 overmsg.set_payload_message(message); 1274 1275 // set SeqNum 1276 if ( ld->transmit_seqnums ) 1277 { 1278 overmsg.setSeqNum(ld->last_sent_seqnum); 1279 } 1280 logging_debug("Sending Message with SeqNum: " << overmsg.getSeqNum()); 1281 1282 // send message over relay/direct/overlay 1283 send_link( &overmsg, ld->overlayId, priority ); 1284 } 1285 1286 // return seqnum 1287 return ld->last_sent_seqnum; 1288 } 1289 1290 1291 const SequenceNumber& BaseOverlay::sendMessage(reboost::message_t message, 1292 const NodeID& node, uint8_t priority, const ServiceID& service) { 891 1293 892 1294 // find link for node and service … … 907 1309 if( ld == NULL ) { 908 1310 logging_error( "Failed to establish auto-link."); 909 return -1;1311 throw message_not_sent("Failed to establish auto-link."); 910 1312 } 911 1313 ld->autolink = true; … … 920 1322 921 1323 // send / queue message 922 return sendMessage( message, ld->overlayId );923 } 924 925 926 NodeID BaseOverlay::sendMessageCloserToNodeID( const Message*message,927 const NodeID& address, const ServiceID& service) {1324 return sendMessage( message, ld->overlayId, priority ); 1325 } 1326 1327 1328 NodeID BaseOverlay::sendMessageCloserToNodeID(reboost::message_t message, 1329 const NodeID& address, uint8_t priority, const ServiceID& service) { 928 1330 929 1331 if ( overlayInterface->isClosestNodeTo(address) ) … … 936 1338 if ( closest_node != NodeID::UNSPECIFIED ) 937 1339 { 938 se qnum_t seqnum = sendMessage(message, closest_node, service);1340 sendMessage(message, closest_node, priority, service); 939 1341 } 940 1342 941 return closest_node; // XXXreturn seqnum ?? tuple? closest_node via (non const) reference?1343 return closest_node; // return seqnum ?? tuple? closest_node via (non const) reference? 942 1344 } 943 1345 // ---------------------------------------------------------------------------- … … 978 1380 979 1381 // see if we can find the node in our own table 980 BOOST_FOREACH(const LinkDescriptor* ld, links){1382 foreach(const LinkDescriptor* ld, links){ 981 1383 if(ld->remoteNode != node) continue; 982 1384 if(!ld->communicationUp) continue; … … 1079 1481 1080 1482 void BaseOverlay::onLinkUp(const LinkID& id, 1081 const address_v* local, const address_v* remote) { 1483 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote) 1484 { 1082 1485 logging_debug( "Link up with base communication link id=" << id ); 1083 1486 … … 1085 1488 LinkDescriptor* ld = getDescriptor(id, true); 1086 1489 1087 // handle bootstrap link we initiated1490 // BRANCH: handle bootstrap link we initiated 1088 1491 if( std::find(bootstrapLinks.begin(), bootstrapLinks.end(), id) != bootstrapLinks.end() ){ 1089 1492 logging_info( 1090 1493 "Join has been initiated by me and the link is now up. " << 1494 "LinkID: " << id.toString() << 1091 1495 "Sending out join request for SpoVNet " << spovnetId.toString() 1092 1496 ); … … 1096 1500 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1097 1501 JoinRequest joinRequest( spovnetId, nodeId ); 1098 overlayMsg.encapsulate( &joinRequest ); 1099 bc->sendMessage( id, &overlayMsg ); 1502 overlayMsg.append_buffer(joinRequest.serialize_into_shared_buffer()); 1503 1504 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); 1505 1100 1506 return; 1101 1507 } 1102 1508 1103 // no link found? ->link establishment from remote, add one!1509 // BRANCH: link establishment from remote, add one! 1104 1510 if (ld == NULL) { 1105 1511 ld = addDescriptor( id ); … … 1115 1521 // in this case, do not inform listener, since service it unknown 1116 1522 // -> wait for update message! 1117 1118 // link mapping found? -> send update message with node-id and service id 1119 } else { 1523 } 1524 1525 // BRANCH: We requested this link in the first place 1526 else 1527 { 1120 1528 logging_info( "onLinkUp descriptor (initiated locally):" << ld ); 1121 1529 … … 1126 1534 ld->fromRemote = false; 1127 1535 1128 // if link is a relayed link->convert to direct link 1129 if (ld->relayed) { 1130 logging_info( "Converting to direct link: " << ld ); 1536 // BRANCH: this was a relayed link before --> convert to direct link 1537 // TODO do we really have to send a message here? 1538 if (ld->relayed) 1539 { 1131 1540 ld->up = true; 1132 1541 ld->relayed = false; 1542 logging_info( "Converting to direct link: " << ld ); 1543 1544 // send message 1133 1545 OverlayMsg overMsg( OverlayMsg::typeLinkDirect ); 1134 1546 overMsg.setSourceLink( ld->overlayId ); 1135 1547 overMsg.setDestinationLink( ld->remoteLink ); 1136 send_link( &overMsg, ld->overlayId ); 1137 } else { 1548 send_link( &overMsg, ld->overlayId, system_priority::OVERLAY ); 1549 1550 // inform listener 1551 if( ld->listener != NULL) 1552 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1553 } 1554 1555 1556 /* NOTE: Chord is opening direct-links in it's setup routine which are 1557 * neither set to "relayed" nor to "up". To activate these links a 1558 * typeLinkUpdate must be sent. 1559 * 1560 * This branch is would also be taken when we had a working link before 1561 * (ld->up == true). I'm not sure if this case does actually happen 1562 * and whether it's tested. 1563 */ 1564 else 1565 { 1138 1566 // note: necessary to validate the link on the remote side! 1139 1567 logging_info( "Sending out update" << … … 1144 1572 // compile and send update message 1145 1573 OverlayMsg overlayMsg( OverlayMsg::typeLinkUpdate ); 1146 overlayMsg.setSourceLink(ld->overlayId);1147 1574 overlayMsg.setAutoLink( ld->autolink ); 1148 send_link( &overlayMsg, ld->overlayId, true ); 1575 overlayMsg.setSourceNode(nodeId); 1576 overlayMsg.setDestinationNode(ld->remoteNode); 1577 overlayMsg.setSourceLink(ld->overlayId); 1578 overlayMsg.setDestinationLink(ld->remoteLink); 1579 overlayMsg.setService(ld->service); 1580 overlayMsg.setRelayed(false); 1581 1582 // TODO ld->communicationId = id ?? 1583 1584 send_overlaymessage_down(&overlayMsg, id, system_priority::OVERLAY); 1149 1585 } 1150 1586 } … … 1152 1588 1153 1589 void BaseOverlay::onLinkDown(const LinkID& id, 1154 const address_v* local, const address_v* remote) { 1155 1590 const addressing2::EndpointPtr local, 1591 const addressing2::EndpointPtr remote) 1592 { 1156 1593 // erase bootstrap links 1157 1594 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); … … 1185 1622 } 1186 1623 1624 1625 void BaseOverlay::onLinkFail(const LinkID& id, 1626 const addressing2::EndpointPtr local, 1627 const addressing2::EndpointPtr remote) 1628 { 1629 logging_debug( "Link fail with base communication link id=" << id ); 1630 1631 // // erase bootstrap links 1632 // vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); 1633 // if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); 1634 // 1635 // // get descriptor for link 1636 // LinkDescriptor* ld = getDescriptor(id, true); 1637 // if ( ld == NULL ) return; // not found? ->ignore! 1638 // logging_debug( "Link failed id=" << ld->overlayId.toString() ); 1639 // 1640 // // inform listeners 1641 // ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1642 // sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1643 1644 logging_debug( " ... calling onLinkDown ..." ); 1645 onLinkDown(id, local, remote); 1646 } 1647 1648 1187 1649 void BaseOverlay::onLinkChanged(const LinkID& id, 1188 const address_v* oldlocal, const address_v* newlocal, 1189 const address_v* oldremote, const address_v* newremote) { 1190 1191 // get descriptor for link 1192 LinkDescriptor* ld = getDescriptor(id, true); 1193 if ( ld == NULL ) return; // not found? ->ignore! 1194 logging_debug( "onLinkChanged descriptor: " << ld ); 1195 1196 // inform listeners 1197 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1198 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1199 1200 // autolinks: refresh timestamp 1201 ld->setAutoUsed(); 1202 } 1203 1204 void BaseOverlay::onLinkFail(const LinkID& id, 1205 const address_v* local, const address_v* remote) { 1206 logging_debug( "Link fail with base communication link id=" << id ); 1207 1208 // erase bootstrap links 1209 vector<LinkID>::iterator it = std::find( bootstrapLinks.begin(), bootstrapLinks.end(), id ); 1210 if( it != bootstrapLinks.end() ) bootstrapLinks.erase( it ); 1211 1212 // get descriptor for link 1213 LinkDescriptor* ld = getDescriptor(id, true); 1214 if ( ld == NULL ) return; // not found? ->ignore! 1215 logging_debug( "Link failed id=" << ld->overlayId.toString() ); 1216 1217 // inform listeners 1218 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 1219 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1220 } 1221 1222 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const address_v* local, 1223 const address_v* remote, const QoSParameterSet& qos) { 1224 logging_debug( "Link quality changed with base communication link id=" << id ); 1225 1226 // get descriptor for link 1227 LinkDescriptor* ld = getDescriptor(id, true); 1228 if ( ld == NULL ) return; // not found? ->ignore! 1229 logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 1230 } 1231 1232 bool BaseOverlay::onLinkRequest( const LinkID& id, const address_v* local, 1233 const address_v* remote ) { 1650 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 1651 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote) 1652 { 1653 // get descriptor for link 1654 LinkDescriptor* ld = getDescriptor(id, true); 1655 if ( ld == NULL ) return; // not found? ->ignore! 1656 logging_debug( "onLinkChanged descriptor: " << ld ); 1657 1658 // inform listeners 1659 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 1660 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); 1661 1662 // autolinks: refresh timestamp 1663 ld->setAutoUsed(); 1664 } 1665 1666 //void BaseOverlay::onLinkQoSChanged(const LinkID& id, 1667 // const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 1668 // const QoSParameterSet& qos) 1669 //{ 1670 // logging_debug( "Link quality changed with base communication link id=" << id ); 1671 // 1672 // // get descriptor for link 1673 // LinkDescriptor* ld = getDescriptor(id, true); 1674 // if ( ld == NULL ) return; // not found? ->ignore! 1675 // logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 1676 //} 1677 1678 bool BaseOverlay::onLinkRequest(const LinkID& id, 1679 const addressing2::EndpointPtr local, 1680 const addressing2::EndpointPtr remote) 1681 { 1234 1682 logging_debug("Accepting link request from " << remote->to_string() ); 1683 1684 // TODO ask application..? 1685 1235 1686 return true; 1236 1687 } 1237 1688 1689 1690 1691 1238 1692 /// handles a message from base communication 1239 bool BaseOverlay::receiveMessage(const Message* message, 1240 const LinkID& link, const NodeID& ) { 1693 bool BaseOverlay::receiveMessage( reboost::shared_buffer_t message, 1694 const LinkID& link, 1695 const NodeID&, 1696 bool bypass_overlay ) 1697 { 1241 1698 // get descriptor for link 1242 1699 LinkDescriptor* ld = getDescriptor( link, true ); 1243 return handleMessage( message, ld, link ); 1700 1701 1702 /* choose fastpath for direct links; normal overlay-path otherwise */ 1703 if ( bypass_overlay && ld ) 1704 { 1705 // message received --> link is alive 1706 ld->keepAliveReceived = time(NULL); 1707 // hop count on this link 1708 ld->hops = 0; 1709 1710 1711 // hand over to CommunicationListener (aka Application) 1712 CommunicationListener* lst = getListener(ld->service); 1713 if ( lst != NULL ) 1714 { 1715 lst->onMessage( 1716 message, 1717 ld->remoteNode, 1718 ld->overlayId, 1719 SequenceNumber::DISABLED, 1720 NULL ); 1721 1722 return true; 1723 } 1724 1725 return false; 1726 } 1727 else 1728 { 1729 return handleMessage( message, ld, link ); 1730 } 1244 1731 } 1245 1732 … … 1247 1734 1248 1735 /// Handle spovnet instance join requests 1249 bool BaseOverlay::handleJoinRequest( OverlayMsg* overlayMsg, const LinkID& bcLink ) {1250 1736 bool BaseOverlay::handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink ) 1737 { 1251 1738 // decapsulate message 1252 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>(); 1739 JoinRequest joinReq; 1740 joinReq.deserialize_from_shared_buffer(message); 1741 1253 1742 logging_info( "Received join request for spovnet " << 1254 joinReq ->getSpoVNetID().toString() );1743 joinReq.getSpoVNetID().toString() ); 1255 1744 1256 1745 // check spovnet id 1257 if( joinReq ->getSpoVNetID() != spovnetId ) {1746 if( joinReq.getSpoVNetID() != spovnetId ) { 1258 1747 logging_error( 1259 1748 "Received join request for spovnet we don't handle " << 1260 joinReq ->getSpoVNetID().toString() );1261 delete joinReq; 1749 joinReq.getSpoVNetID().toString() ); 1750 1262 1751 return false; 1263 1752 } … … 1267 1756 logging_info( "Sending join reply for spovnet " << 1268 1757 spovnetId.toString() << " to node " << 1269 overlayMsg->getSourceNode().toString() <<1758 source.toString() << 1270 1759 ". Result: " << (allow ? "allowed" : "denied") ); 1271 joiningNodes.push_back( overlayMsg->getSourceNode());1760 joiningNodes.push_back( source ); 1272 1761 1273 1762 // return overlay parameters … … 1276 1765 << getEndpointDescriptor().toString() ) 1277 1766 OverlayParameterSet parameters = overlayInterface->getParameters(); 1767 1768 1769 // create JoinReplay Message 1278 1770 OverlayMsg retmsg( OverlayMsg::typeJoinReply, 1279 1771 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1280 JoinReply replyMsg( spovnetId, parameters, 1281 allow, getEndpointDescriptor() ); 1282 retmsg.encapsulate(&replyMsg); 1283 bc->sendMessage( bcLink, &retmsg ); 1284 1285 delete joinReq; 1772 JoinReply replyMsg( spovnetId, parameters, allow ); 1773 retmsg.append_buffer(replyMsg.serialize_into_shared_buffer()); 1774 1775 // XXX This is unlovely clash between the old message system and the new one, 1776 // but a.t.m. we can't migrate everything to the new system at once.. 1777 // ---> Consider the EndpointDescriptor as part of the JoinReply.. 1778 retmsg.append_buffer(getEndpointDescriptor().serialize()); 1779 1780 // * send * 1781 send_overlaymessage_down(&retmsg, bcLink, system_priority::OVERLAY); 1782 1286 1783 return true; 1287 1784 } 1288 1785 1289 1786 /// Handle replies to spovnet instance join requests 1290 bool BaseOverlay::handleJoinReply( OverlayMsg* overlayMsg, const LinkID& bcLink ) { 1787 bool BaseOverlay::handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink ) 1788 { 1291 1789 // decapsulate message 1292 1790 logging_debug("received join reply message"); 1293 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>(); 1791 JoinReply replyMsg; 1792 EndpointDescriptor endpoints; 1793 reboost::shared_buffer_t buff = replyMsg.deserialize_from_shared_buffer(message); 1794 buff = endpoints.deserialize(buff); 1294 1795 1295 1796 // correct spovnet? 1296 if( replyMsg ->getSpoVNetID() != spovnetId ) { // no-> fail1797 if( replyMsg.getSpoVNetID() != spovnetId ) { // no-> fail 1297 1798 logging_error( "Received SpoVNet join reply for " << 1298 replyMsg ->getSpoVNetID().toString() <<1799 replyMsg.getSpoVNetID().toString() << 1299 1800 " != " << spovnetId.toString() ); 1300 delete replyMsg; 1801 1301 1802 return false; 1302 1803 } 1303 1804 1304 1805 // access granted? no -> fail 1305 if( !replyMsg ->getJoinAllowed() ) {1806 if( !replyMsg.getJoinAllowed() ) { 1306 1807 logging_error( "Our join request has been denied" ); 1307 1808 … … 1317 1818 1318 1819 // inform all registered services of the event 1319 BOOST_FOREACH( NodeListener* i, nodeListeners )1820 foreach( NodeListener* i, nodeListeners ) 1320 1821 i->onJoinFailed( spovnetId ); 1321 1822 1322 delete replyMsg;1323 1823 return true; 1324 1824 } … … 1329 1829 1330 1830 logging_debug( "Using bootstrap end-point " 1331 << replyMsg->getBootstrapEndpoint().toString() );1831 << endpoints.toString() ); 1332 1832 1333 1833 // create overlay structure from spovnet parameter set … … 1338 1838 1339 1839 overlayInterface = OverlayFactory::create( 1340 *this, replyMsg ->getParam(), nodeId, this );1840 *this, replyMsg.getParam(), nodeId, this ); 1341 1841 1342 1842 // overlay structure supported? no-> fail! … … 1354 1854 1355 1855 // inform all registered services of the event 1356 BOOST_FOREACH( NodeListener* i, nodeListeners )1856 foreach( NodeListener* i, nodeListeners ) 1357 1857 i->onJoinFailed( spovnetId ); 1358 1858 1359 delete replyMsg;1360 1859 return true; 1361 1860 } … … 1365 1864 overlayInterface->createOverlay(); 1366 1865 1367 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint());1368 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint());1866 overlayInterface->joinOverlay( endpoints ); 1867 overlayBootstrap.recordJoin( endpoints ); 1369 1868 1370 1869 // update ovlvis … … 1372 1871 1373 1872 // inform all registered services of the event 1374 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1375 i->onJoinCompleted( spovnetId ); 1376 1377 delete replyMsg; 1378 1379 } else { 1380 1873 foreach( NodeListener* i, nodeListeners ) 1874 i->onJoinCompleted( spovnetId ); 1875 } 1876 else 1877 { 1381 1878 // this is not the first bootstrap, just join the additional node 1382 1879 logging_debug("not first-time bootstrapping"); 1383 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 1384 overlayBootstrap.recordJoin( replyMsg->getBootstrapEndpoint() ); 1385 1386 delete replyMsg; 1387 1880 overlayInterface->joinOverlay( endpoints ); 1881 overlayBootstrap.recordJoin( endpoints ); 1388 1882 } // if( overlayInterface == NULL ) 1389 1883 … … 1392 1886 1393 1887 1394 bool BaseOverlay::handleData( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1888 bool BaseOverlay::handleData( reboost::shared_buffer_t message, OverlayMsg* overlayMsg, LinkDescriptor* ld ) 1889 { 1395 1890 // get service 1396 const ServiceID& service = overlayMsg->getService(); 1891 const ServiceID& service = ld->service; //overlayMsg->getService(); 1892 1397 1893 logging_debug( "Received data for service " << service.toString() 1398 1894 << " on link " << overlayMsg->getDestinationLink().toString() ); … … 1402 1898 if(lst != NULL){ 1403 1899 lst->onMessage( 1404 overlayMsg, 1405 overlayMsg->getSourceNode(), 1406 overlayMsg->getDestinationLink() 1900 message, 1901 // overlayMsg->getSourceNode(), 1902 // overlayMsg->getDestinationLink(), 1903 ld->remoteNode, 1904 ld->overlayId, 1905 overlayMsg->getSeqNum(), 1906 overlayMsg 1407 1907 ); 1408 1908 } … … 1411 1911 } 1412 1912 1913 bool BaseOverlay::handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg ) 1914 { 1915 /** 1916 * Deserialize MessageLost-Message 1917 * 1918 * - Type of lost message 1919 * - Hop count of lost message 1920 * - Source-LinkID of lost message 1921 */ 1922 const uint8_t* buff = message(0, sizeof(uint8_t)*2).data(); 1923 uint8_t type = buff[0]; 1924 uint8_t hops = buff[1]; 1925 LinkID linkid; 1926 linkid.deserialize(message(sizeof(uint8_t)*2)); 1927 1928 logging_warn("Node " << msg->getSourceNode() 1929 << " informed us, that our message of type " << (int) type 1930 << " is lost after traveling " << (int) hops << " hops." 1931 << " (LinkID: " << linkid.toString()); 1932 1933 1934 // TODO switch-case ? 1935 1936 // BRANCH: LinkRequest --> link request failed 1937 if ( type == OverlayMsg::typeLinkRequest ) 1938 { 1939 __onLinkEstablishmentFailed(linkid); 1940 } 1941 1942 // BRANCH: Data --> link disrupted. Drop link. 1943 // (We could use something more advanced here. e.g. At least send a 1944 // keep-alive message and wait for a keep-alive reply.) 1945 if ( type == OverlayMsg::typeData ) 1946 { 1947 LinkDescriptor* link_desc = getDescriptor(linkid); 1948 1949 if ( link_desc ) 1950 { 1951 link_desc->failed = true; 1952 } 1953 1954 dropLink(linkid); 1955 } 1956 1957 // BRANCH: ping lost 1958 if ( type == OverlayMsg::typePing ) 1959 { 1960 CommunicationListener* lst = getListener(msg->getService()); 1961 if( lst != NULL ) 1962 { 1963 lst->onPingLost(msg->getSourceNode()); 1964 } 1965 } 1966 1967 return true; 1968 } 1969 1970 bool BaseOverlay::handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 1971 { 1972 // TODO AKTUELL: implement interfaces: Node::ping(node); BaseOverlay::ping(node) 1973 1974 bool send_pong = false; 1975 1976 // inform application and ask permission to send a pong message 1977 CommunicationListener* lst = getListener(overlayMsg->getService()); 1978 if( lst != NULL ) 1979 { 1980 send_pong = lst->onPing(overlayMsg->getSourceNode()); 1981 } 1982 1983 // send pong message if allowed 1984 if ( send_pong ) 1985 { 1986 OverlayMsg pong_msg(OverlayMsg::typePong); 1987 pong_msg.setSeqNum(overlayMsg->getSeqNum()); 1988 1989 // send message 1990 try 1991 { 1992 send_node( &pong_msg, 1993 overlayMsg->getSourceNode(), 1994 system_priority::OVERLAY, 1995 overlayMsg->getService() ); 1996 } 1997 catch ( message_not_sent& e ) 1998 { 1999 logging_info("Could not send Pong-Message to node: " << 2000 overlayMsg->getSourceNode()); 2001 } 2002 } 2003 } 2004 2005 bool BaseOverlay::handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 2006 { 2007 // inform application 2008 CommunicationListener* lst = getListener(overlayMsg->getService()); 2009 if( lst != NULL ) 2010 { 2011 lst->onPong(overlayMsg->getSourceNode()); 2012 } 2013 } 1413 2014 1414 2015 bool BaseOverlay::handleLinkUpdate( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { … … 1439 2040 overlayMsg->setSourceLink(ld->overlayId); 1440 2041 overlayMsg->setService(ld->service); 1441 send( overlayMsg, ld );2042 send( overlayMsg, ld, system_priority::OVERLAY ); 1442 2043 } 1443 2044 … … 1481 2082 if( ld->messageQueue.size() > 0 ) { 1482 2083 logging_info( "Sending out queued messages on link " << ld ); 1483 BOOST_FOREACH( Message* msg, ld->messageQueue ) { 1484 sendMessage( msg, ld->overlayId ); 1485 delete msg;1486 2084 foreach( LinkDescriptor::message_queue_entry msg, ld->messageQueue ) 2085 { 2086 sendMessage( msg.message, ld->overlayId, msg.priority ); 2087 } 1487 2088 ld->messageQueue.clear(); 1488 2089 } … … 1497 2098 /// handle a link request and reply 1498 2099 bool BaseOverlay::handleLinkRequest( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1499 logging_info( "Link request received from node id=" << overlayMsg->getSourceNode() );1500 2100 1501 2101 //TODO: Check if a request has already been sent using getSourceLink() ... … … 1514 2114 ldn->remoteNode = overlayMsg->getSourceNode(); 1515 2115 ldn->remoteLink = overlayMsg->getSourceLink(); 1516 2116 ldn->hops = overlayMsg->getNumHops(); 2117 2118 // initialize sequence numbers 2119 ldn->last_sent_seqnum = SequenceNumber::createRandomSeqNum_Short(); 2120 logging_debug("Creating new link with initial SeqNum: " << ldn->last_sent_seqnum); 2121 2122 1517 2123 // update time-stamps 1518 2124 ldn->setAlive(); 1519 2125 ldn->setAutoUsed(); 1520 2126 2127 logging_info( "Link request received from node id=" 2128 << overlayMsg->getSourceNode() 2129 << " LINK: " 2130 << ldn); 2131 1521 2132 // create reply message and send back! 1522 2133 overlayMsg->swapRoles(); // swap source/destination 1523 2134 overlayMsg->setType(OverlayMsg::typeLinkReply); 1524 2135 overlayMsg->setSourceLink(ldn->overlayId); 1525 overlayMsg->setSourceEndpoint( bc->getEndpointDescriptor() );1526 2136 overlayMsg->setRelayed(true); 1527 send( overlayMsg, ld ); // send back to link 2137 // overlayMsg->setRouteRecord(true); 2138 overlayMsg->setSeqNum(ld->last_sent_seqnum); 2139 2140 // TODO aktuell do the same thing in the typeLinkRequest-Message, too. But be careful with race conditions!! 2141 // append our endpoints (for creation of a direct link) 2142 overlayMsg->set_payload_message(bc->getEndpointDescriptor().serialize()); 2143 2144 send( overlayMsg, ld, system_priority::OVERLAY ); // send back to link 1528 2145 1529 2146 // inform listener … … 1534 2151 } 1535 2152 1536 bool BaseOverlay::handleLinkReply( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 1537 2153 bool BaseOverlay::handleLinkReply( 2154 OverlayMsg* overlayMsg, 2155 reboost::shared_buffer_t sub_message, 2156 LinkDescriptor* ld ) 2157 { 2158 // deserialize EndpointDescriptor 2159 EndpointDescriptor endpoints; 2160 endpoints.deserialize(sub_message); 2161 1538 2162 // find link request 1539 2163 LinkDescriptor* ldn = getDescriptor(overlayMsg->getDestinationLink()); … … 1554 2178 1555 2179 // debug message 1556 logging_ debug( "Link request reply received. Establishing link"2180 logging_info( "Link request reply received. Establishing link" 1557 2181 << " for service " << overlayMsg->getService().toString() 1558 2182 << " with local id=" << overlayMsg->getDestinationLink() 1559 2183 << " and remote link id=" << overlayMsg->getSourceLink() 1560 << " to " << overlayMsg->getSourceEndpoint().toString() 2184 << " to " << endpoints.toString() 2185 << " hop count: " << overlayMsg->getRouteRecord().size() 1561 2186 ); 1562 2187 … … 1577 2202 logging_info( "Sending out queued messages on link " << 1578 2203 ldn->overlayId.toString() ); 1579 BOOST_FOREACH( Message* msg, ldn->messageQueue ) {1580 sendMessage( msg, ldn->overlayId );1581 delete msg;2204 foreach( LinkDescriptor::message_queue_entry msg, ldn->messageQueue ) 2205 { 2206 sendMessage( msg.message, ldn->overlayId, msg.priority ); 1582 2207 } 1583 2208 ldn->messageQueue.clear(); … … 1589 2214 // try to replace relay link with direct link 1590 2215 ldn->retryCounter = 3; 1591 ldn->endpoint = overlayMsg->getSourceEndpoint();2216 ldn->endpoint = endpoints; 1592 2217 ldn->communicationId = bc->establishLink( ldn->endpoint ); 1593 2218 … … 1596 2221 1597 2222 /// handle a keep-alive message for a link 1598 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) { 2223 bool BaseOverlay::handleLinkAlive( OverlayMsg* overlayMsg, LinkDescriptor* ld ) 2224 { 1599 2225 LinkDescriptor* rld = getDescriptor(overlayMsg->getDestinationLink()); 1600 if ( rld != NULL ) { 1601 logging_debug("Keep-Alive for " << 1602 overlayMsg->getDestinationLink() ); 2226 2227 if ( rld != NULL ) 2228 { 2229 logging_debug("Keep-Alive for " << overlayMsg->getDestinationLink() ); 1603 2230 if (overlayMsg->isRouteRecord()) 2231 { 1604 2232 rld->routeRecord = overlayMsg->getRouteRecord(); 2233 } 2234 2235 // set alive 1605 2236 rld->setAlive(); 2237 2238 2239 /* answer keep alive */ 2240 if ( overlayMsg->getType() == OverlayMsg::typeKeepAlive ) 2241 { 2242 time_t now = time(NULL); 2243 logging_debug("[BaseOverlay] Answering KeepAlive over " 2244 << ld->to_string() 2245 << " after " 2246 << difftime( now, ld->keepAliveSent ) 2247 << "s"); 2248 2249 OverlayMsg msg( OverlayMsg::typeKeepAliveReply, 2250 OverlayInterface::OVERLAY_SERVICE_ID, nodeId, ld->remoteNode ); 2251 msg.setRouteRecord(true); 2252 ld->keepAliveSent = now; 2253 send_link( &msg, ld->overlayId, system_priority::OVERLAY ); 2254 } 2255 1606 2256 return true; 1607 } else { 1608 logging_error("Keep-Alive for " 2257 } 2258 else 2259 { 2260 logging_error("No Keep-Alive for " 1609 2261 << overlayMsg->getDestinationLink() << ": link unknown." ); 1610 2262 return false; … … 1636 2288 // erase the original descriptor 1637 2289 eraseDescriptor(ld->overlayId); 2290 2291 // inform listener 2292 if( rld->listener != NULL) 2293 rld->listener->onLinkChanged( rld->overlayId, rld->remoteNode ); 2294 1638 2295 return true; 1639 2296 } 1640 2297 1641 2298 /// handles an incoming message 1642 bool BaseOverlay::handleMessage( const Message* message, LinkDescriptor* ld, 1643 const LinkID bcLink ) { 1644 logging_debug( "Handling message: " << message->toString()); 1645 2299 bool BaseOverlay::handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld, 2300 const LinkID bcLink ) 2301 { 1646 2302 // decapsulate overlay message 1647 OverlayMsg* overlayMsg = 1648 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 1649 if( overlayMsg == NULL ) return false; 1650 2303 OverlayMsg* overlayMsg = new OverlayMsg(); 2304 reboost::shared_buffer_t sub_buff = overlayMsg->deserialize_from_shared_buffer(message); 2305 2306 // // XXX debug 2307 // logging_info( "Received overlay message." 2308 // << " Hops: " << (int) overlayMsg->getNumHops() 2309 // << " Type: " << (int) overlayMsg->getType() 2310 // << " Payload size: " << sub_buff.size() 2311 // << " SeqNum: " << overlayMsg->getSeqNum() ); 2312 2313 1651 2314 // increase number of hops 1652 2315 overlayMsg->increaseNumHops(); … … 1660 2323 // handle signaling messages (do not route!) 1661 2324 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && 1662 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) { 1663 overlayInterface->onMessage(overlayMsg, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 2325 overlayMsg->getType()<=OverlayMsg::typeSignalingEnd ) 2326 { 2327 overlayInterface->onMessage(overlayMsg, sub_buff, NodeID::UNSPECIFIED, LinkID::UNSPECIFIED); 1664 2328 delete overlayMsg; 1665 2329 return true; … … 1673 2337 << " to " << overlayMsg->getDestinationNode() 1674 2338 ); 1675 route( overlayMsg ); 2339 2340 // // XXX testing AKTUELL 2341 // logging_info("MARIO: Routing message " 2342 // << " from " << overlayMsg->getSourceNode() 2343 // << " to " << overlayMsg->getDestinationNode() ); 2344 // logging_info( "Type: " << overlayMsg->getType() << " Payload size: " << sub_buff.size()); 2345 overlayMsg->append_buffer(sub_buff); 2346 2347 route( overlayMsg, ld->remoteNode ); 1676 2348 delete overlayMsg; 1677 2349 return true; 1678 2350 } 1679 2351 1680 // handle base overlay message 2352 2353 /* handle base overlay message */ 1681 2354 bool ret = false; // return value 1682 switch ( overlayMsg->getType() ) { 1683 1684 // data transport messages 1685 case OverlayMsg::typeData: 1686 ret = handleData(overlayMsg, ld); break; 1687 1688 // overlay setup messages 1689 case OverlayMsg::typeJoinRequest: 1690 ret = handleJoinRequest(overlayMsg, bcLink ); break; 1691 case OverlayMsg::typeJoinReply: 1692 ret = handleJoinReply(overlayMsg, bcLink ); break; 1693 1694 // link specific messages 1695 case OverlayMsg::typeLinkRequest: 1696 ret = handleLinkRequest(overlayMsg, ld ); break; 1697 case OverlayMsg::typeLinkReply: 1698 ret = handleLinkReply(overlayMsg, ld ); break; 1699 case OverlayMsg::typeLinkUpdate: 1700 ret = handleLinkUpdate(overlayMsg, ld ); break; 1701 case OverlayMsg::typeLinkAlive: 1702 ret = handleLinkAlive(overlayMsg, ld ); break; 1703 case OverlayMsg::typeLinkDirect: 1704 ret = handleLinkDirect(overlayMsg, ld ); break; 1705 1706 // handle unknown message type 1707 default: { 1708 logging_error( "received message in invalid state! don't know " << 1709 "what to do with this message of type " << overlayMsg->getType() ); 1710 ret = false; 1711 break; 1712 } 2355 try 2356 { 2357 switch ( overlayMsg->getType() ) 2358 { 2359 // data transport messages 2360 case OverlayMsg::typeData: 2361 { 2362 // NOTE: On relayed links, »ld« does not point to our link, but on the relay link. 2363 LinkDescriptor* end_to_end_ld = getDescriptor(overlayMsg->getDestinationLink()); 2364 2365 if ( ! end_to_end_ld ) 2366 { 2367 logging_warn("Error: Data-Message claims to belong to a link we don't know."); 2368 2369 ret = false; 2370 } 2371 else 2372 { 2373 // message received --> link is alive 2374 end_to_end_ld->keepAliveReceived = time(NULL); 2375 // hop count on this link 2376 end_to_end_ld->hops = overlayMsg->getNumHops(); 2377 2378 // * call handler * 2379 ret = handleData(sub_buff, overlayMsg, end_to_end_ld); 2380 } 2381 2382 break; 2383 } 2384 case OverlayMsg::typeMessageLost: 2385 ret = handleLostMessage(sub_buff, overlayMsg); 2386 2387 break; 2388 2389 // overlay setup messages 2390 case OverlayMsg::typeJoinRequest: 2391 ret = handleJoinRequest(sub_buff, overlayMsg->getSourceNode(), bcLink ); break; 2392 case OverlayMsg::typeJoinReply: 2393 ret = handleJoinReply(sub_buff, bcLink ); break; 2394 2395 // link specific messages 2396 case OverlayMsg::typeLinkRequest: 2397 ret = handleLinkRequest(overlayMsg, ld ); break; 2398 case OverlayMsg::typeLinkReply: 2399 ret = handleLinkReply(overlayMsg, sub_buff, ld ); break; 2400 case OverlayMsg::typeLinkUpdate: 2401 ret = handleLinkUpdate(overlayMsg, ld ); break; 2402 case OverlayMsg::typeKeepAlive: 2403 case OverlayMsg::typeKeepAliveReply: 2404 ret = handleLinkAlive(overlayMsg, ld ); break; 2405 case OverlayMsg::typeLinkDirect: 2406 ret = handleLinkDirect(overlayMsg, ld ); break; 2407 2408 case OverlayMsg::typeLinkClose: 2409 { 2410 dropLink(overlayMsg->getDestinationLink()); 2411 __removeDroppedLink(overlayMsg->getDestinationLink()); 2412 2413 break; 2414 } 2415 2416 /// ping over overlay path (or similar) 2417 case OverlayMsg::typePing: 2418 { 2419 ret = handlePing(overlayMsg, ld); 2420 break; 2421 } 2422 case OverlayMsg::typePong: 2423 { 2424 ret = handlePong(overlayMsg, ld); 2425 break; 2426 } 2427 2428 // handle unknown message type 2429 default: 2430 { 2431 logging_error( "received message in invalid state! don't know " << 2432 "what to do with this message of type " << overlayMsg->getType() ); 2433 ret = false; 2434 break; 2435 } 2436 } 2437 } 2438 catch ( reboost::illegal_sub_buffer& e ) 2439 { 2440 logging_error( "Failed to create sub-buffer while reading message: »" 2441 << e.what() 2442 << "« Message too short? "); 2443 2444 assert(false); // XXX 1713 2445 } 1714 2446 … … 1720 2452 // ---------------------------------------------------------------------------- 1721 2453 1722 void BaseOverlay::broadcastMessage( Message* message, const ServiceID& service) {2454 void BaseOverlay::broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority) { 1723 2455 1724 2456 logging_debug( "broadcasting message to all known nodes " << 1725 2457 "in the overlay from service " + service.toString() ); 1726 1727 if(message == NULL) return;1728 message->setReleasePayload(false);1729 2458 1730 2459 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(true); … … 1732 2461 NodeID& id = nodes.at(i); 1733 2462 if(id == this->nodeId) continue; // don't send to ourselfs 1734 if(i+1 == nodes.size()) message->setReleasePayload(true); // release payload on last send 1735 sendMessage( message, id, service );2463 2464 sendMessage( message, id, priority, service ); 1736 2465 } 1737 2466 } … … 1755 2484 vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const { 1756 2485 vector<LinkID> linkvector; 1757 BOOST_FOREACH( LinkDescriptor* ld, links ) {2486 foreach( LinkDescriptor* ld, links ) { 1758 2487 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) { 1759 2488 linkvector.push_back( ld->overlayId ); … … 1779 2508 updateVisual(); 1780 2509 } 2510 2511 2512 2513 /* link status */ 2514 bool BaseOverlay::isLinkDirect(const ariba::LinkID& lnk) const 2515 { 2516 const LinkDescriptor* ld = getDescriptor(lnk); 2517 2518 if (!ld) 2519 return false; 2520 2521 return ld->communicationUp && !ld->relayed; 2522 } 2523 2524 int BaseOverlay::getHopCount(const ariba::LinkID& lnk) const 2525 { 2526 const LinkDescriptor* ld = getDescriptor(lnk); 2527 2528 if (!ld) 2529 return -1; 2530 2531 return ld->hops; 2532 } 2533 2534 2535 bool BaseOverlay::isLinkVital(const LinkDescriptor* link) const 2536 { 2537 time_t now = time(NULL); 2538 2539 return link->up && difftime( now, link->keepAliveReceived ) <= KEEP_ALIVE_TIME_OUT; // TODO is this too long for a "vital" link..? 2540 } 2541 2542 bool BaseOverlay::isLinkDirectVital(const LinkDescriptor* link) const 2543 { 2544 return isLinkVital(link) && link->communicationUp && !link->relayed; 2545 } 2546 2547 /* [link status] */ 2548 1781 2549 1782 2550 void BaseOverlay::updateVisual(){ … … 1878 2646 static set<NodeID> linkset; 1879 2647 set<NodeID> remotenodes; 1880 BOOST_FOREACH( LinkDescriptor* ld, links ) {1881 if (! ld->isVital() || ld->service != OverlayInterface::OVERLAY_SERVICE_ID)2648 foreach( LinkDescriptor* ld, links ) { 2649 if (!isLinkVital(ld) || ld->service != OverlayInterface::OVERLAY_SERVICE_ID) 1882 2650 continue; 1883 2651 … … 1895 2663 do{ 1896 2664 changed = false; 1897 BOOST_FOREACH(NodeID n, linkset){2665 foreach(NodeID n, linkset){ 1898 2666 if(remotenodes.find(n) == remotenodes.end()){ 1899 2667 visualInstance.visDisconnect(visualIdBase, this->nodeId, n, ""); … … 1908 2676 do{ 1909 2677 changed = false; 1910 BOOST_FOREACH(NodeID n, remotenodes){2678 foreach(NodeID n, remotenodes){ 1911 2679 if(linkset.find(n) == linkset.end()){ 1912 2680 visualInstance.visConnect(visualIdBase, this->nodeId, n, ""); … … 1933 2701 // dump link state 1934 2702 s << "--- link state -------------------------------" << endl; 1935 BOOST_FOREACH( LinkDescriptor* ld, links ) {2703 foreach( LinkDescriptor* ld, links ) { 1936 2704 s << "link " << i << ": " << ld << endl; 1937 2705 i++; -
source/ariba/overlay/BaseOverlay.h
r10653 r12060 47 47 #include <vector> 48 48 #include <deque> 49 #include <stdexcept> 49 50 #include <boost/foreach.hpp> 51 52 #ifdef ECLIPSE_PARSER 53 #define foreach(a, b) for(a : b) 54 #else 55 #define foreach(a, b) BOOST_FOREACH(a, b) 56 #endif 50 57 51 58 #include "ariba/utility/messages.h" … … 64 71 #include "ariba/overlay/modules/OverlayStructureEvents.h" 65 72 #include "ariba/overlay/OverlayBootstrap.h" 73 #include "ariba/overlay/SequenceNumber.h" 66 74 67 75 // forward declarations … … 92 100 using ariba::communication::BaseCommunication; 93 101 using ariba::communication::CommunicationEvents; 102 103 // transport 104 //using ariba::transport::system_priority; 94 105 95 106 // utilities … … 103 114 using ariba::utility::Demultiplexer; 104 115 using ariba::utility::MessageReceiver; 105 using ariba::utility::MessageSender;106 116 using ariba::utility::seqnum_t; 107 117 using ariba::utility::Timer; … … 110 120 namespace overlay { 111 121 112 using namespace ariba::addressing; 122 123 124 class message_not_sent: public std::runtime_error 125 { 126 public: 127 /** Takes a character string describing the error. */ 128 explicit message_not_sent(const string& __arg) : 129 std::runtime_error(__arg) 130 { 131 } 132 133 virtual ~message_not_sent() throw() {} 134 }; 135 136 113 137 114 138 class LinkDescriptor; … … 121 145 protected Timer { 122 146 123 friend class OneHop; 147 // friend class OneHop; // DEPRECATED 124 148 friend class Chord; 125 149 friend class ariba::SideportListener; … … 128 152 129 153 public: 130 131 154 /** 132 155 * Constructs an empty non-functional base overlay instance … … 142 165 * Starts the Base Overlay instance 143 166 */ 144 void start(BaseCommunication &_basecomm, const NodeID& _nodeid);167 void start(BaseCommunication* _basecomm, const NodeID& _nodeid); 145 168 146 169 /** … … 161 184 * Starts a link establishment procedure to the specfied node 162 185 * for the service with id service 163 * 186 * 164 187 * @param node Destination node id 165 188 * @param service Service to connect to … … 179 202 void dropLink( const LinkID& link ); 180 203 204 205 206 /* +++++ Message sending +++++ */ 207 208 181 209 /// sends a message over an existing link 182 seqnum_t sendMessage(const Message* message, const LinkID& link ); 210 const SequenceNumber& sendMessage(reboost::message_t message, 211 const LinkID& link, 212 uint8_t priority ) throw(message_not_sent); 183 213 184 214 /// sends a message to a node and a specific service 185 seqnum_t sendMessage(const Message* message, const NodeID& remote, 186 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 215 const SequenceNumber& sendMessage(reboost::message_t message, 216 const NodeID& remote, 217 uint8_t priority, 218 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 219 187 220 188 221 /** … … 191 224 * @return NodeID of the (closest) destination node; 192 225 */ 193 NodeID sendMessageCloserToNodeID( const Message*message, const NodeID& address,194 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID);226 NodeID sendMessageCloserToNodeID(reboost::message_t message, const NodeID& address, 227 uint8_t priority, const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 195 228 196 229 /** … … 198 231 * Depending on the structure of the overlay, this can be very different. 199 232 */ 200 void broadcastMessage(Message* message, const ServiceID& service); 201 233 void broadcastMessage(reboost::message_t message, const ServiceID& service, uint8_t priority); 234 235 236 /* +++++ [Message sending] +++++ */ 237 238 239 202 240 /** 203 241 * Returns the end-point descriptor of a link. … … 294 332 */ 295 333 void leaveSpoVNet(); 334 335 336 /* link status */ 337 bool isLinkDirect(const ariba::LinkID& lnk) const; 338 int getHopCount(const ariba::LinkID& lnk) const; 339 340 bool isLinkVital(const LinkDescriptor* link) const; 341 bool isLinkDirectVital(const LinkDescriptor* link) const; 296 342 297 343 protected: 298 /** 299 * @see ariba::communication::CommunicationEvents.h 300 */ 301 virtual void onLinkUp(const LinkID& id, const address_v* local, 302 const address_v* remote); 303 304 /** 305 * @see ariba::communication::CommunicationEvents.h 306 */ 307 virtual void onLinkDown(const LinkID& id, const address_v* local, 308 const address_v* remote); 309 310 /** 311 * @see ariba::communication::CommunicationEvents.h 312 */ 313 virtual void onLinkChanged(const LinkID& id, 314 const address_v* oldlocal, const address_v* newlocal, 315 const address_v* oldremote, const address_v* newremote); 316 317 /** 318 * @see ariba::communication::CommunicationEvents.h 319 */ 320 virtual void onLinkFail(const LinkID& id, const address_v* local, 321 const address_v* remote); 322 323 /** 324 * @see ariba::communication::CommunicationEvents.h 325 */ 326 virtual void onLinkQoSChanged(const LinkID& id, 327 const address_v* local, const address_v* remote, 328 const QoSParameterSet& qos); 329 330 /** 331 * @see ariba::communication::CommunicationEvents.h 332 */ 333 virtual bool onLinkRequest(const LinkID& id, const address_v* local, 334 const address_v* remote); 335 344 345 /** 346 * @see ariba::communication::CommunicationEvents.h 347 */ 348 virtual bool onLinkRequest(const LinkID& id, 349 const addressing2::EndpointPtr local, 350 const addressing2::EndpointPtr remote); 351 352 /** 353 * @see ariba::communication::CommunicationEvents.h 354 */ 355 virtual void onLinkUp(const LinkID& id, 356 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 357 358 /** 359 * @see ariba::communication::CommunicationEvents.h 360 */ 361 virtual void onLinkDown(const LinkID& id, 362 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 363 364 /** 365 * @see ariba::communication::CommunicationEvents.h 366 */ 367 virtual void onLinkChanged(const LinkID& id, 368 const addressing2::EndpointPtr oldlocal, const addressing2::EndpointPtr newlocal, 369 const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote); 370 371 /** 372 * @see ariba::communication::CommunicationEvents.h 373 * 374 * NOTE: Just calls onLinkDown (at the moment..) 375 */ 376 virtual void onLinkFail(const LinkID& id, 377 const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote); 378 379 /** 380 * @see ariba::communication::CommunicationEvents.h 381 */ 382 // virtual void onLinkQoSChanged(const LinkID& id, 383 // const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote, 384 // const QoSParameterSet& qos); 385 386 387 388 336 389 /** 337 390 * Processes a received message from BaseCommunication … … 340 393 * the node the message came from! 341 394 */ 342 virtual bool receiveMessage( const Message* message, const LinkID& link, 343 const NodeID& ); 395 virtual bool receiveMessage( reboost::shared_buffer_t message, 396 const LinkID& link, 397 const NodeID&, 398 bool bypass_overlay ); 344 399 345 400 /** … … 359 414 std::string getLinkHTMLInfo(); 360 415 416 417 private: 418 /// NOTE: "id" is an Overlay-LinkID 419 void __onLinkEstablishmentFailed(const LinkID& id); 420 421 /// called from typeLinkClose-handler 422 void __removeDroppedLink(const LinkID& link); 423 361 424 private: 362 425 /// is the base overlay started yet … … 396 459 397 460 /// demultiplexes a incoming message with link descriptor 398 bool handleMessage( const Message*message, LinkDescriptor* ld,461 bool handleMessage( reboost::shared_buffer_t message, LinkDescriptor* ld, 399 462 const LinkID bcLink = LinkID::UNSPECIFIED ); 400 463 401 464 // handle data and signalling messages 402 bool handleData( OverlayMsg* msg, LinkDescriptor* ld ); 465 bool handleData( reboost::shared_buffer_t message, OverlayMsg* msg, LinkDescriptor* ld ); 466 bool handleLostMessage( reboost::shared_buffer_t message, OverlayMsg* msg ); 403 467 bool handleSignaling( OverlayMsg* msg, LinkDescriptor* ld ); 404 468 405 469 // handle join request / reply messages 406 bool handleJoinRequest( OverlayMsg* msg, const LinkID& bcLink );407 bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink );470 bool handleJoinRequest( reboost::shared_buffer_t message, const NodeID& source, const LinkID& bcLink ); 471 bool handleJoinReply( reboost::shared_buffer_t message, const LinkID& bcLink ); 408 472 409 473 // handle link messages 410 474 bool handleLinkRequest( OverlayMsg* msg, LinkDescriptor* ld ); 411 bool handleLinkReply( OverlayMsg* msg, LinkDescriptor* ld );475 bool handleLinkReply( OverlayMsg* msg, reboost::shared_buffer_t sub_message, LinkDescriptor* ld ); 412 476 bool handleLinkUpdate( OverlayMsg* msg, LinkDescriptor* ld ); 413 477 bool handleLinkDirect( OverlayMsg* msg, LinkDescriptor* ld ); 414 478 bool handleLinkAlive( OverlayMsg* msg, LinkDescriptor* ld ); 479 480 // ping-pong over overlaypath/routing 481 bool handlePing( OverlayMsg* overlayMsg, LinkDescriptor* ld ); 482 bool handlePong( OverlayMsg* overlayMsg, LinkDescriptor* ld ); 415 483 416 484 … … 478 546 // internal message delivery ----------------------------------------------- 479 547 548 // Convert OverlayMessage into new format and give it down to BaseCommunication 549 seqnum_t send_overlaymessage_down( OverlayMsg* message, const LinkID& bc_link, uint8_t priority ); 550 551 480 552 /// routes a message to its destination node 481 void route( OverlayMsg* message );482 553 void route( OverlayMsg* message, const NodeID& last_hop = NodeID::UNSPECIFIED ); 554 483 555 /// sends a raw message to another node, delivers it to the base overlay class 484 seqnum_t send( OverlayMsg* message, const NodeID& destination ); 556 /// may throw "message_not_sent"-exception 557 seqnum_t send( OverlayMsg* message, 558 const NodeID& destination, 559 uint8_t priority, 560 const NodeID& last_hop = NodeID::UNSPECIFIED ) 561 throw(message_not_sent); 485 562 486 563 /// send a raw message using a link descriptor, delivers it to the base overlay class 487 seqnum_t send( OverlayMsg* message, LinkDescriptor* ld, 488 bool ignore_down = false ); 564 seqnum_t send( OverlayMsg* message, 565 LinkDescriptor* ld, 566 uint8_t priority ) throw(message_not_sent); 489 567 490 568 /// send a message using a node id using overlay routing 491 569 /// sets necessary fields in the overlay message! 492 seqnum_t send_node( OverlayMsg* message, const NodeID& remote, 493 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 570 /// may throw "message_not_sent"-exception 571 seqnum_t send_node( OverlayMsg* message, const NodeID& remote, uint8_t priority, 572 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID) throw(message_not_sent); 494 573 495 574 /// send a message using a node id using overlay routing using a link 496 575 /// sets necessary fields in the overlay message! 497 seqnum_t send_link( OverlayMsg* message, const LinkID& link, 498 bool ignore_down = false ); 499 576 void send_link( OverlayMsg* message, 577 const LinkID& link, 578 uint8_t priority ) throw(message_not_sent); 579 580 581 /// sends a notification to a sender from whom we just dropped a message 582 void report_lost_message( const OverlayMsg* message ); 583 500 584 // misc -------------------------------------------------------------------- 501 585 -
source/ariba/overlay/CMakeLists.txt
r10700 r12060 41 41 LinkDescriptor.h 42 42 OverlayBootstrap.h 43 SequenceNumber.h 43 44 ) 44 45 … … 47 48 LinkDescriptor.cpp 48 49 OverlayBootstrap.cpp 50 SequenceNumber.cpp 49 51 ) 50 52 -
source/ariba/overlay/LinkDescriptor.h
r6961 r12060 12 12 #include "ariba/communication/EndpointDescriptor.h" 13 13 #include "ariba/CommunicationListener.h" 14 15 // reboost messages 16 #include "ariba/utility/transport/messages/message.hpp" 17 #include <ariba/utility/misc/sha1.h> 18 19 #include "ariba/overlay/SequenceNumber.h" 20 14 21 15 22 namespace ariba { … … 37 44 class LinkDescriptor { 38 45 public: 46 struct message_queue_entry 47 { 48 reboost::message_t message; 49 uint8_t priority; 50 }; 51 39 52 // ctor 40 53 LinkDescriptor() { 54 time_t now = time(NULL); 55 41 56 // default values 42 57 this->up = false; 58 // this->closing = false; 59 this->failed = false; 43 60 this->fromRemote = false; 44 61 this->remoteNode = NodeID::UNSPECIFIED; … … 46 63 this->communicationUp = false; 47 64 this->communicationId = LinkID::UNSPECIFIED; 48 this->keepAlive Time = time(NULL);49 this->keepAlive Missed = 0;65 this->keepAliveReceived = now; 66 this->keepAliveSent = now; 50 67 this->relaying = false; 51 this->timeRelaying = time(NULL);68 this->timeRelaying = now; 52 69 this->dropAfterRelaying = false; 53 70 this->service = ServiceID::UNSPECIFIED; … … 56 73 this->remoteLink = LinkID::UNSPECIFIED; 57 74 this->autolink = false; 58 this->lastuse = time(NULL);75 this->lastuse = now; 59 76 this->retryCounter = 0; 77 this->hops = -1; 78 79 this->transmit_seqnums = false; // XXX 60 80 } 61 81 … … 67 87 // general information about the link -------------------------------------- 68 88 bool up; ///< flag whether this link is up and running 89 // bool closing; ///< flag, whether this link is in the regular process of closing 90 bool failed; ///< flag, whether communication is (assumed to be) not/no longer possible on this link 69 91 bool fromRemote; ///< flag, whether this link was requested from remote 70 92 NodeID remoteNode; ///< remote end-point node 71 bool isVital() {72 return up && keepAliveMissed == 0;73 }74 bool isDirectVital() {75 return isVital() && communicationUp && !relayed;76 }77 93 78 94 … … 82 98 bool communicationUp; ///< flag, whether the communication is up 83 99 100 // sequence numbers -------------------------------------------------------- 101 SequenceNumber last_sent_seqnum; 102 bool transmit_seqnums; 103 84 104 // direct link retries ----------------------------------------------------- 85 105 EndpointDescriptor endpoint; … … 87 107 88 108 // link alive information -------------------------------------------------- 89 time_t keepAlive Time; ///< the last time a keep-alive message was received90 int keepAliveMissed; ///< the number of missed keep-alive messages109 time_t keepAliveReceived; ///< the last time a keep-alive message was received 110 time_t keepAliveSent; ///< the number of missed keep-alive messages 91 111 void setAlive() { 92 keepAliveMissed = 0;93 keepAlive Time= time(NULL);112 // keepAliveSent = time(NULL); 113 keepAliveReceived = time(NULL); 94 114 } 95 115 … … 98 118 LinkID remoteLink; ///< the remote link id 99 119 vector<NodeID> routeRecord; 120 int hops; 100 121 101 122 // relay state ------------------------------------------------------------- … … 114 135 // auto links -------------------------------------------------------------- 115 136 bool autolink; ///< flag, whether this link is a auto-link 116 time_t lastuse; ///< time, when the link was last used 117 deque< Message*> messageQueue; ///< waiting messages to be delivered137 time_t lastuse; ///< time, when the link was last used XXX AUTO_LINK-ONLY 138 deque<message_queue_entry> messageQueue; ///< waiting messages to be delivered 118 139 void setAutoUsed() { 119 140 if (autolink) lastuse = time(NULL); … … 121 142 /// drops waiting auto-link messages 122 143 void flushQueue() { 123 BOOST_FOREACH( Message* msg, messageQueue ) delete msg; 144 // BOOST_FOREACH( Message* msg, messageQueue ) delete msg; // XXX MARIO: shouldn't be necessary anymore, since we're using shared pointers 124 145 messageQueue.clear(); 125 146 } … … 127 148 // string representation --------------------------------------------------- 128 149 std::string to_string() const { 150 time_t now = time(NULL); 151 129 152 std::ostringstream s; 153 if ( relayed ) 154 s << "[RELAYED-"; 155 else 156 s << "[DIRECT-"; 157 s << "LINK] "; 158 s << "id=" << overlayId.toString().substr(0,4) << " "; 159 s << "serv=" << service.toString() << " "; 130 160 s << "up=" << up << " "; 131 161 s << "init=" << !fromRemote << " "; 132 s << "id=" << overlayId.toString().substr(0,4) << " ";133 s << "serv=" << service.toString() << " ";134 162 s << "node=" << remoteNode.toString().substr(0,4) << " "; 135 163 s << "relaying=" << relaying << " "; 136 s << " miss=" << keepAliveMissed << "";164 s << "last_received=" << now - keepAliveReceived << "s "; 137 165 s << "auto=" << autolink << " "; 166 s << "hops=" << hops << " "; 138 167 if ( relayed ) { 139 168 s << "| Relayed: "; … … 146 175 } else { 147 176 s << "| Direct: "; 148 s << "using id=" << communicationId.toString().substr(0,4) << " ";177 s << "using [COMMUNICATION-LINK] id=" << communicationId.toString().substr(0,4) << " "; 149 178 s << "(up=" << communicationUp << ") "; 150 179 } -
source/ariba/overlay/messages/JoinReply.cpp
r3690 r12060 44 44 vsznDefault(JoinReply); 45 45 46 JoinReply::JoinReply(const SpoVNetID _spovnetid, const OverlayParameterSet _param, bool _joinAllowed, const EndpointDescriptor _bootstrapEp) 47 : spovnetid( _spovnetid ), param( _param ), joinAllowed( _joinAllowed ), bootstrapEp( _bootstrapEp ){ 46 JoinReply::JoinReply(const SpoVNetID _spovnetid, const OverlayParameterSet _param, bool _joinAllowed) 47 : spovnetid( _spovnetid ), param( _param ), joinAllowed( _joinAllowed ) 48 { 48 49 } 49 50 … … 64 65 } 65 66 66 const EndpointDescriptor& JoinReply::getBootstrapEndpoint(){67 return bootstrapEp;68 }67 //const EndpointDescriptor& JoinReply::getBootstrapEndpoint(){ 68 // return bootstrapEp; 69 //} 69 70 70 71 }} // ariba::overlay -
source/ariba/overlay/messages/JoinReply.h
r5870 r12060 40 40 #define JOIN_REPLY_H__ 41 41 42 #include "ariba/utility/messages.h" 42 //#include "ariba/utility/messages.h" 43 #include "ariba/utility/messages/Message.h" 43 44 #include "ariba/utility/serialization.h" 44 45 #include "ariba/utility/types/SpoVNetID.h" 45 46 #include "ariba/utility/types/NodeID.h" 46 47 #include "ariba/utility/types/OverlayParameterSet.h" 47 #include "ariba/communication/EndpointDescriptor.h"48 //#include "ariba/communication/EndpointDescriptor.h" 48 49 49 50 using ariba::utility::OverlayParameterSet; … … 51 52 using ariba::utility::SpoVNetID; 52 53 using ariba::utility::NodeID; 53 using ariba::communication::EndpointDescriptor;54 //using ariba::communication::EndpointDescriptor; 54 55 55 56 namespace ariba { … … 64 65 OverlayParameterSet param; //< overlay parameters 65 66 bool joinAllowed; //< join successfull or access denied 66 EndpointDescriptor bootstrapEp; //< the endpoint for bootstrapping the overlay interface67 // EndpointDescriptor bootstrapEp; //< the endpoint for bootstrapping the overlay interface 67 68 68 69 public: … … 70 71 const SpoVNetID _spovnetid = SpoVNetID::UNSPECIFIED, 71 72 const OverlayParameterSet _param = OverlayParameterSet::DEFAULT, 72 bool _joinAllowed = false ,73 const EndpointDescriptor _bootstrapEp = EndpointDescriptor::UNSPECIFIED() 73 bool _joinAllowed = false /*, 74 const EndpointDescriptor _bootstrapEp = EndpointDescriptor::UNSPECIFIED()*/ 74 75 ); 75 76 … … 79 80 const OverlayParameterSet& getParam(); 80 81 bool getJoinAllowed(); 81 const EndpointDescriptor& getBootstrapEndpoint();82 // const EndpointDescriptor& getBootstrapEndpoint(); 82 83 }; 83 84 … … 86 87 sznBeginDefault( ariba::overlay::JoinReply, X ) { 87 88 uint8_t ja = joinAllowed; 88 X && &spovnetid && param && bootstrapEp && ja; 89 X && &spovnetid && param; 90 // X && bootstrapEp; 91 X && ja; 89 92 if (X.isDeserializer()) joinAllowed = ja; 90 93 } sznEnd(); -
source/ariba/overlay/messages/OverlayMsg.h
r10653 r12060 47 47 #include "ariba/utility/types/NodeID.h" 48 48 #include "ariba/utility/types/LinkID.h" 49 #include "ariba/communication/EndpointDescriptor.h" 50 49 // #include <ariba/utility/misc/sha1.h> 50 #include "ariba/overlay/SequenceNumber.h" 51 51 52 52 namespace ariba { … … 57 57 using ariba::utility::ServiceID; 58 58 using ariba::utility::Message; 59 using ariba::communication::EndpointDescriptor;59 //using ariba::communication::EndpointDescriptor; 60 60 using_serialization; 61 61 … … 64 64 * between nodes. 65 65 * 66 * @author Sebastian Mies <mies@tm.uka.de> 66 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock 67 67 */ 68 68 class OverlayMsg: public Message { VSERIALIZEABLE; … … 75 75 maskTransfer = 0x10, ///< bit mask for transfer messages 76 76 typeData = 0x11, ///< message contains data for higher layers 77 typeMessageLost = 0x12, ///< message contains info about a dropped message 77 78 78 79 // join signaling … … 87 88 typeLinkUpdate = 0x33, ///< update message for link association 88 89 typeLinkDirect = 0x34, ///< direct connection has been established 89 typeLinkAlive = 0x35, ///< keep-alive message 90 typeKeepAlive = 0x35, ///< keep-alive message 91 typeKeepAliveReply = 0x36, ///< keep-alive message (replay) 92 typeLinkClose = 0x37, 90 93 91 94 /// DHT routed messages … … 100 103 maskDHTResponse = 0x50, ///< bit mask for dht responses 101 104 typeDHTData = 0x51, ///< DHT get data 105 106 /// misc message types 107 typePing = 0x44, 108 typePong = 0x45, 102 109 103 110 // topology signaling … … 105 112 typeSignalingEnd = 0xFF ///< end of the signaling types 106 113 }; 114 115 /// message flags (uint8_t) 116 enum flags_ 117 { 118 flagRelayed = 1 << 0, 119 flagRegisterRelay = 1 << 1, 120 flagRouteRecord = 1 << 2, 121 flagSeqNum1 = 1 << 3, 122 flagSeqNum2 = 1 << 4, 123 flagAutoLink = 1 << 5, 124 flagLinkMessage = 1 << 6, 125 flagHasMoreFlags = 1 << 7 126 }; 107 127 108 128 /// default constructor … … 114 134 const LinkID& _sourceLink = LinkID::UNSPECIFIED, 115 135 const LinkID& _destinationLink = LinkID::UNSPECIFIED ) 116 : type(type), flags(0), hops(0), ttl(10),136 : type(type), flags(0), extended_flags(0), hops(0), ttl(10), priority(0), 117 137 service(_service), 118 138 sourceNode(_sourceNode), destinationNode(_destinationNode), … … 125 145 // copy constructor 126 146 OverlayMsg(const OverlayMsg& rhs) 127 : type(rhs.type), flags(rhs.flags), hops(rhs.hops), ttl(rhs.ttl), 128 service(rhs.service), 147 : type(rhs.type), flags(rhs.flags), extended_flags(rhs.extended_flags), 148 hops(rhs.hops), ttl(rhs.ttl), 149 priority(rhs.priority), service(rhs.service), 129 150 sourceNode(rhs.sourceNode), destinationNode(rhs.destinationNode), 130 151 sourceLink(rhs.sourceLink), destinationLink(rhs.destinationLink), … … 149 170 } 150 171 172 /// priority ------------------------------------------------------------------ 173 174 uint8_t getPriority() const { 175 return priority; 176 } 177 178 void setPriority(uint8_t priority) { 179 this->priority = priority; 180 } 181 151 182 /// flags ------------------------------------------------------------------ 152 183 153 184 bool isRelayed() const { 154 return (flags & 0x01)!=0;185 return (flags & flagRelayed)!=0; 155 186 } 156 187 157 188 void setRelayed( bool relayed = true ) { 158 if (relayed) flags |= 1; else flags &= ~1;189 if (relayed) flags |= flagRelayed; else flags &= ~flagRelayed; 159 190 } 160 191 161 192 bool isRegisterRelay() const { 162 return (flags & 0x02)!=0;193 return (flags & flagRegisterRelay)!=0; 163 194 } 164 195 165 196 void setRegisterRelay( bool relayed = true ) { 166 if (relayed) flags |= 0x02; else flags &= ~0x02;197 if (relayed) flags |= flagRegisterRelay; else flags &= ~flagRegisterRelay; 167 198 } 168 199 169 200 bool isRouteRecord() const { 170 return (flags & 0x04)!=0;201 return (flags & flagRouteRecord)!=0; 171 202 } 172 203 173 204 void setRouteRecord( bool route_record = true ) { 174 if (route_record) flags |= 0x04; else flags &= ~0x04;205 if (route_record) flags |= flagRouteRecord; else flags &= ~flagRouteRecord; 175 206 } 176 207 177 208 bool isAutoLink() const { 178 return (flags & 0x80) == 0x80;209 return (flags & flagAutoLink) == flagAutoLink; 179 210 } 180 211 181 212 void setAutoLink(bool auto_link = true ) { 182 if (auto_link) flags |= 0x80; else flags &= ~0x80;213 if (auto_link) flags |= flagAutoLink; else flags &= ~flagAutoLink; 183 214 } 184 215 185 216 bool isLinkMessage() const { 186 return (flags & 0x40)!=0;217 return (flags & flagLinkMessage)!=0; 187 218 } 188 219 189 220 void setLinkMessage(bool link_info = true ) { 190 if (link_info) flags |= 0x40; else flags &= ~0x40; 191 } 192 193 bool containsSourceEndpoint() const { 194 return (flags & 0x20)!=0; 195 } 196 197 void setContainsSourceEndpoint(bool contains_endpoint) { 198 if (contains_endpoint) flags |= 0x20; else flags &= ~0x20; 199 } 221 if (link_info) flags |= flagLinkMessage; else flags &= ~flagLinkMessage; 222 } 223 224 bool hasExtendedFlags() const { 225 return (flags & flagHasMoreFlags) == flagHasMoreFlags; 226 } 200 227 201 228 /// number of hops and time to live ---------------------------------------- … … 264 291 this->destinationLink = link; 265 292 setLinkMessage(); 266 }267 268 void setSourceEndpoint( const EndpointDescriptor& endpoint ) {269 sourceEndpoint = endpoint;270 setContainsSourceEndpoint(true);271 }272 273 const EndpointDescriptor& getSourceEndpoint() const {274 return sourceEndpoint;275 293 } 276 294 … … 284 302 destinationLink = dummyLink; 285 303 hops = 0; 304 routeRecord.clear(); 286 305 } 287 306 … … 294 313 routeRecord.push_back(node); 295 314 } 315 316 /// sequence numbers 317 bool hasShortSeqNum() const 318 { 319 return (flags & (flagSeqNum1 | flagSeqNum2)) == flagSeqNum1; 320 } 321 322 bool hasLongSeqNum() const 323 { 324 return (flags & (flagSeqNum1 | flagSeqNum2)) == flagSeqNum2; 325 } 326 327 void setSeqNum(const SequenceNumber& sequence_number) 328 { 329 this->seqnum = sequence_number; 330 331 // short seqnum 332 if ( sequence_number.isShortSeqNum() ) 333 { 334 flags |= flagSeqNum1; 335 flags &= ~flagSeqNum2; 336 } 337 // longseqnum 338 else if ( sequence_number.isShortSeqNum() ) 339 { 340 flags &= ~flagSeqNum1; 341 flags |= flagSeqNum2; 342 } 343 // no seqnum 344 else 345 { 346 flags &= ~flagSeqNum1; 347 flags &= ~flagSeqNum2; 348 } 349 } 350 351 const SequenceNumber& getSeqNum() const 352 { 353 return seqnum; 354 } 355 296 356 297 357 private: 298 uint8_t type, flags, hops, ttl;358 uint8_t type, flags, extended_flags, hops, ttl, priority; 299 359 ServiceID service; 300 360 NodeID sourceNode; … … 302 362 LinkID sourceLink; 303 363 LinkID destinationLink; 304 EndpointDescriptor sourceEndpoint;364 // EndpointDescriptor sourceEndpoint; 305 365 vector<NodeID> routeRecord; 366 SequenceNumber seqnum; 306 367 }; 307 368 … … 311 372 sznBeginDefault( ariba::overlay::OverlayMsg, X ){ 312 373 // header 313 X && type && flags && hops && ttl; 374 X && type && flags; 375 376 if ( hasExtendedFlags() ) 377 X && extended_flags; 378 379 X && hops && ttl; 314 380 315 381 // addresses 316 382 X && &service && &sourceNode && &destinationNode; 383 384 // priority 385 X && priority; 317 386 318 387 // message is associated with a end-to-end link … … 320 389 X && &sourceLink && &destinationLink; 321 390 322 // message is associated with a source end-point 323 if (containsSourceEndpoint()) 324 X && sourceEndpoint; 325 391 392 /* seqnum */ 393 // serialize 394 if ( X.isSerializer() ) 395 { 396 if ( hasShortSeqNum() ) 397 { 398 uint32_t short_seqnum; 399 short_seqnum = seqnum.getShortSeqNum(); 400 X && short_seqnum; 401 } 402 if ( hasLongSeqNum() ) 403 { 404 uint64_t long_seqnum; 405 long_seqnum = seqnum.getLongSeqNum(); 406 X && long_seqnum; 407 } 408 } 409 // deserialize 410 else 411 { 412 if ( hasShortSeqNum() ) 413 { 414 uint32_t short_seqnum; 415 X && short_seqnum; 416 seqnum = ariba::overlay::SequenceNumber(short_seqnum); 417 } 418 if ( hasLongSeqNum() ) 419 { 420 uint64_t long_seqnum; 421 X && long_seqnum; 422 seqnum = ariba::overlay::SequenceNumber(long_seqnum); 423 } 424 } 425 426 326 427 // message should record its route 327 428 if (isRouteRecord()) { … … 333 434 334 435 // payload 335 X && Payload();436 // X && Payload(); 336 437 } sznEnd(); 337 438 -
source/ariba/overlay/modules/OverlayFactory.cpp
r3718 r12060 41 41 // structured overlays 42 42 #include "chord/Chord.h" 43 #include "onehop/OneHop.h" 43 //#include "onehop/OneHop.h" //DEPRECATED 44 44 45 45 namespace ariba { … … 57 57 return new Chord( baseoverlay, nodeid, routeReceiver, param ); 58 58 59 case OverlayParameterSet::OverlayStructureOneHop:60 return new OneHop( baseoverlay, nodeid, routeReceiver, param );59 // case OverlayParameterSet::OverlayStructureOneHop: 60 // return new OneHop( baseoverlay, nodeid, routeReceiver, param ); 61 61 62 62 default: 63 // NEVER return "NULL" 64 assert(false); 65 throw 42; 63 66 return NULL; 64 67 } 65 68 69 // NEVER return "NULL" 70 assert(false); 71 throw 42; 66 72 return NULL; 67 73 } -
source/ariba/overlay/modules/OverlayInterface.cpp
r6854 r12060 68 68 69 69 void OverlayInterface::onLinkFail(const LinkID& lnk, const NodeID& remote) { 70 onLinkDown(lnk, remote); 70 71 } 71 72 … … 79 80 } 80 81 81 void OverlayInterface::onMessage(const DataMessage& msg, const NodeID& remote, 82 void OverlayInterface::onMessage(OverlayMsg* msg, 83 reboost::shared_buffer_t sub_msg, 84 const NodeID& remote, 82 85 const LinkID& lnk) { 83 86 } -
source/ariba/overlay/modules/OverlayInterface.h
r10573 r12060 145 145 */ 146 146 virtual const LinkID& getNextLinkId( const NodeID& id ) const = 0; 147 147 148 /** 149 * Returns link ids of possible next hops a route message could take, 150 * sorted by "quality" (e.g. overlay-distance). 151 * 152 * The »num« parameter can be used to specify the desired number of elements 153 * in the returned vector. This is intendet for optimizations. The 154 * implementation may choose to return a different number of elements than 155 * requested. 156 * 157 * NOTE: The returned vector may contain »unspecified« links. These refer to 158 * to the own node. (e.g. If there's no closer node, the top element in the 159 * returned vector is unsoecified.) 160 * 161 * @param id The destination node id 162 * @param num The desired number of elements in the returned vector. 163 * (0 means »not specified/max)« 164 * @return A sorted vector of link ids to possible next hops. 165 */ 166 virtual std::vector<const LinkID*> getSortedLinkIdsTowardsNode( 167 const NodeID& id, int num = 0 ) const = 0; 168 148 169 /** 149 170 * Returns the NodeID of the next hop a route message would take. … … 176 197 177 198 /// @see CommunicationListener 178 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 199 virtual void onMessage(OverlayMsg* msg, 200 reboost::shared_buffer_t sub_msg, 201 const NodeID& remote, 179 202 const LinkID& lnk = LinkID::UNSPECIFIED); 180 203 -
source/ariba/overlay/modules/OverlayStructureEvents.h
r5151 r12060 41 41 42 42 #include "ariba/utility/types/NodeID.h" 43 #include "ariba/utility/messages.h" 43 #include "ariba/utility/types/LinkID.h" 44 #include "ariba/utility/messages/Message.h" 44 45 45 46 using ariba::utility::NodeID; 47 using ariba::utility::LinkID; 46 48 using ariba::utility::Message; 47 49 … … 50 52 51 53 class OverlayInterface; 52 class OneHop;54 //class OneHop; 53 55 54 56 class OverlayStructureEvents { 55 57 friend class ariba::overlay::OverlayInterface; 56 friend class ariba::overlay::OneHop;58 // friend class ariba::overlay::OneHop; 57 59 58 60 public: -
source/ariba/overlay/modules/chord/Chord.cpp
r10572 r12060 43 43 #include "detail/chord_routing_table.hpp" 44 44 45 #include "messages/Discovery.h" 45 //#include "messages/Discovery.h" // XXX DEPRECATED 46 46 47 47 namespace ariba { … … 55 55 typedef chord_routing_table::item route_item; 56 56 57 using ariba::transport::system_priority; 58 57 59 use_logging_cpp( Chord ); 60 61 62 ////// Messages 63 struct DiscoveryMessage 64 { 65 /** 66 * DiscoveryMessage 67 * - type 68 * - data 69 * - Endpoint 70 */ 71 72 // type enum 73 enum type_ { 74 invalid = 0, 75 normal = 1, 76 successor = 2, 77 predecessor = 3 78 }; 79 80 81 // data 82 uint8_t type; 83 uint8_t ttl; 84 EndpointDescriptor endpoint; 85 86 // serialize 87 reboost::message_t serialize() 88 { 89 // serialize endpoint 90 reboost::message_t msg = endpoint.serialize(); 91 92 // serialize type and ttl 93 uint8_t* buff1 = msg.push_front(2*sizeof(uint8_t)).mutable_data(); 94 buff1[0] = type; 95 buff1[1] = ttl; 96 97 return msg; 98 } 99 100 //deserialize 101 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff) 102 { 103 // deserialize type and ttl 104 const uint8_t* bytes = buff.data(); 105 type = bytes[0]; 106 ttl = bytes[1]; 107 108 // deserialize endpoint 109 return endpoint.deserialize(buff(2*sizeof(uint8_t))); 110 } 111 }; 112 58 113 59 114 Chord::Chord(BaseOverlay& _baseoverlay, const NodeID& _nodeid, … … 102 157 103 158 /// helper: sends a message using the "base overlay" 104 seqnum_t Chord::send( OverlayMsg* msg, const LinkID& link ) { 105 if (link.isUnspecified()) return 0; 106 return baseoverlay.send_link( msg, link ); 159 void Chord::send( OverlayMsg* msg, const LinkID& link ) { 160 if (link.isUnspecified()) 161 return; 162 163 baseoverlay.send_link( msg, link, system_priority::OVERLAY ); 164 } 165 166 void Chord::send_node( OverlayMsg* message, const NodeID& remote ) 167 { 168 try 169 { 170 baseoverlay.send( message, remote, system_priority::OVERLAY ); 171 } 172 catch ( message_not_sent& e ) 173 { 174 logging_warn("Chord: Could not send message to " << remote 175 << ": " << e.what()); 176 } 107 177 } 108 178 … … 116 186 OverlayMsg msg( typeDiscovery ); 117 187 msg.setRegisterRelay(true); 118 Discovery dmsg( Discovery::normal, (uint8_t)ttl, baseoverlay.getEndpointDescriptor() ); 119 msg.encapsulate(&dmsg); 188 189 // create DiscoveryMessage 190 DiscoveryMessage dmsg; 191 dmsg.type = DiscoveryMessage::normal; 192 dmsg.ttl = ttl; 193 dmsg.endpoint = baseoverlay.getEndpointDescriptor(); 194 195 msg.set_payload_message(dmsg.serialize()); 120 196 121 197 // send to node 122 baseoverlay.send_node( &msg, remote ); 198 try 199 { 200 baseoverlay.send_node( &msg, remote, system_priority::OVERLAY ); 201 } 202 catch ( message_not_sent& e ) 203 { 204 logging_warn("Chord: Could not send message to " << remote 205 << ": " << e.what()); 206 } 123 207 } 124 208 125 209 void Chord::discover_neighbors( const LinkID& link ) { 126 210 uint8_t ttl = 1; 211 212 // FIXME try-catch for the send operations 213 214 // create DiscoveryMessage 215 DiscoveryMessage dmsg; 216 dmsg.ttl = ttl; 217 dmsg.endpoint = baseoverlay.getEndpointDescriptor(); 127 218 { 128 219 // send predecessor discovery 129 220 OverlayMsg msg( typeDiscovery ); 130 221 msg.setRegisterRelay(true); 131 Discovery dmsg( Discovery::predecessor, ttl, 132 baseoverlay.getEndpointDescriptor() ); 133 msg.encapsulate(&dmsg); 222 223 // set type 224 dmsg.type = DiscoveryMessage::predecessor; 225 226 // send 227 msg.set_payload_message(dmsg.serialize()); 134 228 send(&msg, link); 135 229 } … … 137 231 // send successor discovery 138 232 OverlayMsg msg( typeDiscovery ); 139 msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); 233 // msg.setSourceEndpoint( baseoverlay.getEndpointDescriptor() ); // XXX this was redundand, wasn't it? 140 234 msg.setRegisterRelay(true); 141 Discovery dmsg( Discovery::successor, ttl, 142 baseoverlay.getEndpointDescriptor() ); 143 msg.encapsulate(&dmsg); 235 236 // set type 237 dmsg.type = DiscoveryMessage::successor; 238 239 // send 240 msg.set_payload_message(dmsg.serialize()); 144 241 send(&msg, link); 145 242 } … … 163 260 164 261 // timer for stabilization management 165 Timer::setInterval(1000); 262 // Timer::setInterval(1000); // TODO find an appropriate interval! 263 Timer::setInterval(10000); // XXX testing... 166 264 Timer::start(); 167 265 } … … 200 298 return item->info; 201 299 } 300 301 std::vector<const LinkID*> Chord::getSortedLinkIdsTowardsNode( 302 const NodeID& id, int num ) const 303 { 304 std::vector<const LinkID*> ret; 305 306 switch ( num ) 307 { 308 // special case: just call »getNextLinkId« 309 case 1: 310 { 311 ret.push_back(&getNextLinkId(id)); 312 313 break; 314 } 315 316 // * calculate top 2 * 317 case 0: 318 case 2: 319 { 320 std::vector<const route_item*> items = table->get_next_2_hops(id); 321 322 ret.reserve(items.size()); 323 324 BOOST_FOREACH( const route_item* item, items ) 325 { 326 ret.push_back(&item->info); 327 } 328 329 break; 330 } 331 332 // NOTE: implement real sorting, if needed (and handle "case 0" properly, then) 333 default: 334 { 335 throw std::runtime_error("Not implemented. (Chord::getSortedLinkIdsTowardsNode with num != 2)"); 336 337 break; 338 } 339 } 340 341 return ret; 342 } 343 202 344 203 345 /// @see OverlayInterface.h … … 253 395 if (remote==nodeid) { 254 396 logging_warn("dropping link that has been established to myself (nodes have same nodeid?)"); 397 logging_warn("NodeID: " << remote); 255 398 baseoverlay.dropLink(lnk); 256 399 return; … … 290 433 /// @see CommunicationListener.h or @see OverlayInterface.h 291 434 void Chord::onLinkDown(const LinkID& lnk, const NodeID& remote) { 292 logging_debug("link_down: link=" << lnk.toString() << " remote=" << 435 // XXX logging_debug 436 logging_info("link_down (Chord): link=" << lnk.toString() << " remote=" << 293 437 remote.toString() ); 294 438 … … 303 447 /// @see CommunicationListener.h 304 448 /// @see OverlayInterface.h 305 void Chord::onMessage(const DataMessage& msg, const NodeID& remote, 449 void Chord::onMessage(OverlayMsg* msg, 450 reboost::shared_buffer_t sub_msg, 451 const NodeID& remote, 306 452 const LinkID& link) { 307 453 308 // decode message309 OverlayMsg* m = dynamic_cast<OverlayMsg*>(msg.getMessage());310 if (m == NULL) return;311 312 454 // handle messages 313 switch ((signalMessageTypes) m->getType()) {455 switch ((signalMessageTypes) msg->getType()) { 314 456 315 457 // discovery request 316 case typeDiscovery: { 317 // decapsulate message 318 Discovery* dmsg = m->decapsulate<Discovery> (); 458 case typeDiscovery: 459 { 460 // deserialize discovery message 461 DiscoveryMessage dmsg; 462 dmsg.deserialize(sub_msg); 463 319 464 logging_debug("Received discovery message with" 320 << " src=" << m ->getSourceNode().toString()321 << " dst=" << m ->getDestinationNode().toString()322 << " ttl=" << (int)dmsg ->getTTL()323 << " type=" << (int)dmsg ->getType()465 << " src=" << msg->getSourceNode().toString() 466 << " dst=" << msg->getDestinationNode().toString() 467 << " ttl=" << (int)dmsg.ttl 468 << " type=" << (int)dmsg.type 324 469 ); 325 470 … … 327 472 bool found = false; 328 473 BOOST_FOREACH( NodeID& value, discovery ) 329 if (value == m ->getSourceNode()) {474 if (value == msg->getSourceNode()) { 330 475 found = true; 331 476 break; 332 477 } 333 if (!found) discovery.push_back(m ->getSourceNode());478 if (!found) discovery.push_back(msg->getSourceNode()); 334 479 335 480 // check if source node can be added to routing table and setup link 336 if (m ->getSourceNode() != nodeid)337 setup( dmsg ->getEndpoint(), m->getSourceNode() );481 if (msg->getSourceNode() != nodeid) 482 setup( dmsg.endpoint, msg->getSourceNode() ); 338 483 339 484 // process discovery message -------------------------- switch start -- 340 switch (dmsg->getType()) { 341 342 // normal: route discovery message like every other message 343 case Discovery::normal: { 344 // closest node? yes-> split to follow successor and predecessor 345 if ( table->is_closest_to(m->getDestinationNode()) ) { 346 logging_debug("Discovery split:"); 347 if (!table->get_successor()->isUnspecified()) { 348 OverlayMsg omsg(*m); 349 dmsg->setType(Discovery::successor); 350 omsg.encapsulate(dmsg); 351 logging_debug("* Routing to successor " 352 << table->get_successor()->toString() ); 353 baseoverlay.send( &omsg, *table->get_successor() ); 354 } 355 356 // send predecessor message 357 if (!table->get_predesessor()->isUnspecified()) { 358 OverlayMsg omsg(*m); 359 dmsg->setType(Discovery::predecessor); 360 omsg.encapsulate(dmsg); 361 logging_debug("* Routing to predecessor " 362 << table->get_predesessor()->toString() ); 363 baseoverlay.send( &omsg, *table->get_predesessor() ); 364 } 365 } 366 // no-> route message 367 else { 368 baseoverlay.route( m ); 369 } 370 break; 371 } 372 373 // successor mode: follow the successor until TTL is zero 374 case Discovery::successor: 375 case Discovery::predecessor: { 376 // reached destination? no->forward! 377 if (m->getDestinationNode() != nodeid) { 378 OverlayMsg omsg(*m); 379 omsg.encapsulate(dmsg); 380 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 381 baseoverlay.route( &omsg ); 382 break; 383 } 384 385 // time to live ended? yes-> stop routing 386 if (dmsg->getTTL() == 0 || dmsg->getTTL() > 10) break; 387 388 // decrease time-to-live 389 dmsg->setTTL(dmsg->getTTL() - 1); 390 391 const route_item* item = NULL; 392 if (dmsg->getType() == Discovery::successor && 393 table->get_successor() != NULL) { 394 item = table->get(*table->get_successor()); 395 } else { 396 if (table->get_predesessor()!=NULL) 397 item = table->get(*table->get_predesessor()); 398 } 399 if (item == NULL) 400 break; 401 402 logging_debug("Routing discovery message to succ/pred " 403 << item->id.toString() ); 404 OverlayMsg omsg(*m); 405 omsg.encapsulate(dmsg); 406 omsg.setDestinationNode(item->id); 407 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 408 baseoverlay.send(&omsg, omsg.getDestinationNode()); 409 break; 410 } 411 case Discovery::invalid: 412 break; 413 414 default: 415 break; 416 } 417 // process discovery message ---------------------------- switch end -- 418 419 delete dmsg; 420 break; 421 } 422 423 // leave 424 case typeLeave: { 425 if (link!=LinkID::UNSPECIFIED) { 426 route_item* item = table->get(remote); 427 if (item!=NULL) item->info = LinkID::UNSPECIFIED; 428 table->remove(remote); 429 baseoverlay.dropLink(link); 430 } 431 break; 432 }} 485 switch ( dmsg.type ) 486 { 487 // normal: route discovery message like every other message 488 case DiscoveryMessage::normal: 489 { 490 // closest node? yes-> split to follow successor and predecessor 491 if ( table->is_closest_to(msg->getDestinationNode()) ) 492 { 493 logging_debug("Discovery split:"); 494 if (!table->get_successor()->isUnspecified()) 495 { 496 OverlayMsg omsg(*msg); 497 498 dmsg.type = DiscoveryMessage::successor; 499 omsg.set_payload_message(dmsg.serialize()); 500 501 logging_debug("* Routing to successor " 502 << table->get_successor()->toString() ); 503 send_node( &omsg, *table->get_successor() ); 504 } 505 506 // send predecessor message 507 if (!table->get_predesessor()->isUnspecified()) 508 { 509 OverlayMsg omsg(*msg); 510 511 dmsg.type = DiscoveryMessage::predecessor; 512 omsg.set_payload_message(dmsg.serialize()); 513 514 logging_debug("* Routing to predecessor " 515 << table->get_predesessor()->toString() ); 516 send_node( &omsg, *table->get_predesessor() ); 517 } 518 } 519 // no-> route message 520 else 521 { 522 baseoverlay.route( msg ); 523 } 524 break; 525 } 526 527 // successor mode: follow the successor until TTL is zero 528 case DiscoveryMessage::successor: 529 case DiscoveryMessage::predecessor: 530 { 531 // reached destination? no->forward! 532 if (msg->getDestinationNode() != nodeid) 533 { 534 OverlayMsg omsg(*msg); 535 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 536 537 omsg.set_payload_message(dmsg.serialize()); 538 539 baseoverlay.route( &omsg ); 540 break; 541 } 542 543 // time to live ended? yes-> stop routing 544 if (dmsg.ttl == 0 || dmsg.ttl > 10) break; 545 546 // decrease time-to-live 547 dmsg.ttl--; 548 549 const route_item* item = NULL; 550 if (dmsg.type == DiscoveryMessage::successor && 551 table->get_successor() != NULL) 552 { 553 item = table->get(*table->get_successor()); 554 } 555 else if (table->get_predesessor() != NULL) 556 { 557 item = table->get(*table->get_predesessor()); 558 } 559 if (item == NULL) 560 break; 561 562 logging_debug("Routing discovery message to succ/pred " 563 << item->id.toString() ); 564 OverlayMsg omsg(*msg); 565 omsg.setService(OverlayInterface::OVERLAY_SERVICE_ID); 566 omsg.setDestinationNode(item->id); 567 568 omsg.set_payload_message(dmsg.serialize()); 569 570 send_node( &omsg, omsg.getDestinationNode() ); 571 break; 572 } 573 case DiscoveryMessage::invalid: 574 break; 575 576 default: 577 break; 578 } 579 // process discovery message ---------------------------- switch end -- 580 581 break; 582 } 583 584 // leave 585 case typeLeave: { 586 if (link!=LinkID::UNSPECIFIED) { 587 route_item* item = table->get(remote); 588 if (item!=NULL) item->info = LinkID::UNSPECIFIED; 589 table->remove(remote); 590 baseoverlay.dropLink(link); 591 } 592 break; 593 } 594 } 433 595 } 434 596 -
source/ariba/overlay/modules/chord/Chord.h
r10572 r12060 45 45 #include "../OverlayInterface.h" 46 46 #include <vector> 47 #include <stdexcept> 47 48 48 49 class chord_routing_table; … … 87 88 const NodeID& node = NodeID::UNSPECIFIED ); 88 89 89 // helper: sends a message using the "base overlay"90 seqnum_tsend( OverlayMsg* msg, const LinkID& link );90 // helper: sends a message over a link using the "base overlay" 91 void send( OverlayMsg* msg, const LinkID& link ); 91 92 93 // helper: sends a message to a node using the "base overlay" 94 void send_node( OverlayMsg* message, const NodeID& remote ); 95 92 96 // stabilization: sends a discovery message to the specified neighborhood 93 97 void send_discovery_to( const NodeID& destination, int ttl = 3 ); … … 105 109 virtual const LinkID& getNextLinkId( const NodeID& id ) const; 106 110 111 /// @see OverlayInterface.h 112 /// NOTE: This implementation excepts num == 2 113 virtual std::vector<const LinkID*> getSortedLinkIdsTowardsNode( 114 const NodeID& id, int num = 0 ) const; 115 107 116 /// @see OverlayInterface.h 108 117 virtual const NodeID& getNextNodeId( const NodeID& id ) const; … … 138 147 139 148 /// @see CommunicationListener.h or @see OverlayInterface.h 140 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 149 virtual void onMessage(OverlayMsg* msg, 150 reboost::shared_buffer_t sub_msg, 151 const NodeID& remote, 141 152 const LinkID& lnk = LinkID::UNSPECIFIED); 142 153 -
source/ariba/overlay/modules/chord/detail/chord_routing_table.hpp
r8606 r12060 96 96 // the own node id 97 97 nodeid_t id; 98 99 // the own node id as routing item 100 item own_id_item; 98 101 99 102 // successor and predecessor tables … … 168 171 /// constructs the reactive chord routing table 169 172 explicit chord_routing_table( const nodeid_t& id, int redundancy = 4 ) : 170 id(id), succ( redundancy, succ_compare_type(this->id), *this ), 171 pred( redundancy, pred_compare_type(this->id), *this ) { 173 id(id), 174 succ( redundancy, succ_compare_type(this->id), *this ), 175 pred( redundancy, pred_compare_type(this->id), *this ) 176 { 177 // init reflexive item 178 own_id_item.id = id; 179 own_id_item.ref_count = 1; 172 180 173 181 // create finger tables … … 273 281 } 274 282 } 283 275 284 if (best_item != NULL && distance(value, id)<distance(value, best_item->id)) 276 285 return NULL; 277 286 return best_item; 278 287 } 288 289 std::vector<const item*> get_next_2_hops( const nodeid_t& value) 290 { 291 ring_distance distance; 292 item* best_item = &own_id_item; 293 item* second_best_item = NULL; 294 295 // find best and second best item 296 for (size_t i=0; i<table.size(); i++) 297 { 298 item* curr = &table[i]; 299 300 // not not include orphans into routing! 301 if (curr->ref_count==0) continue; 302 303 // check if we found a better item 304 // is best item 305 if ( distance(value, curr->id) < distance(value, best_item->id) ) 306 { 307 second_best_item = best_item; 308 best_item = curr; 309 } 310 // is second best item 311 else 312 { 313 if ( second_best_item == NULL ) 314 { 315 second_best_item = curr; 316 continue; 317 } 318 319 if ( distance(value, curr->id) < distance(value, second_best_item->id) ) 320 { 321 second_best_item = curr; 322 } 323 } 324 } 325 326 // prepare return vector 327 std::vector<const item*> ret; 328 if ( best_item != NULL ) 329 { 330 ret.push_back(best_item); 331 } 332 if ( second_best_item != NULL ) 333 { 334 ret.push_back(second_best_item); 335 } 336 337 return ret; 338 } 279 339 280 340 const nodeid_t* get_successor() { 281 if (succ.size()== NULL) return NULL;341 if (succ.size()==0) return NULL; 282 342 return &succ.front(); 283 343 } 284 344 285 345 const nodeid_t* get_predesessor() { 286 if (pred.size()== NULL) return NULL;346 if (pred.size()==0) return NULL; 287 347 return &pred.front(); 288 348 } -
source/ariba/overlay/modules/chord/messages/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(Discovery.h)39 #add_headers(Discovery.h) 40 40 41 add_sources(Discovery.cpp)41 #add_sources(Discovery.cpp) -
source/ariba/overlay/modules/chord/messages/Discovery.h
r5870 r12060 59 59 using_serialization; 60 60 61 // XXX This whole message is DEPRECATED 61 62 class Discovery : public Message { 62 63 VSERIALIZEABLE; -
source/ariba/overlay/modules/onehop/CMakeLists.txt
r10700 r12060 37 37 # [License] 38 38 39 add_headers(OneHop.h)40 39 41 add_sources(OneHop.cpp) 40 ## DEPRECATED 41 #add_headers(OneHop.h) 42 42 43 add_subdir_sources(messages) 43 #add_sources(OneHop.cpp) 44 45 #add_subdir_sources(messages) -
source/ariba/utility/CMakeLists.txt
r10700 r12060 45 45 add_subdir_sources( 46 46 addressing 47 addressing2 47 48 bootstrap 48 49 configuration 49 50 internal 50 51 logging 51 measurement52 52 messages 53 53 misc -
source/ariba/utility/addressing
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/addressing/endpoint_set.hpp
r10653 r12060 1 #ifndef ENDPOINT_SET_HPP_ 2 #define ENDPOINT_SET_HPP_ 1 #ifndef ENDPOINT_SET_HPP_DEPRECATED_ 2 #define ENDPOINT_SET_HPP_DEPRECATED_ 3 3 4 4 #include "addressing.hpp" -
source/ariba/utility/addressing/facades/address_v.hpp
r10789 r12060 13 13 14 14 #include "../detail/address_convenience.hpp" 15 16 #include <boost/shared_ptr.hpp> 15 17 16 18 namespace ariba { … … 26 28 class address_v: public detail::address_convenience<address_v> { 27 29 public: 30 typedef boost::shared_ptr<address_v> shared_ptr; 31 28 32 virtual ~address_v() {} 29 33 -
source/ariba/utility/addressing/ip_address.hpp
r6919 r12060 1 #ifndef IP_ADDRESS_HPP_ 2 #define IP_ADDRESS_HPP_ 1 #ifndef IP_ADDRESS_HPP_DEPRECATED_ 2 #define IP_ADDRESS_HPP_DEPRECATED_ 3 3 4 4 #include <string> -
source/ariba/utility/addressing/tcpip_endpoint.hpp
r5284 r12060 1 #ifndef TCPIP_ENDPOINT_HPP_ 2 #define TCPIP_ENDPOINT_HPP_ 1 #ifndef TCPIP_ENDPOINT_HPP_DEPRECATED_ 2 #define TCPIP_ENDPOINT_HPP_DEPRECATED_ 3 3 4 4 #include<string> -
source/ariba/utility/bootstrap/modules/bluetoothsdp
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/bootstrap/modules/periodicbroadcast
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h
r10653 r12060 40 40 #define __PERIODIC_BROADCAST_H 41 41 42 // ariba 42 43 #include "ariba/config.h" 43 44 #include "ariba/utility/bootstrap/modules/BootstrapModule.h" 45 #include "ariba/utility/logging/Logging.h" 46 #include "ariba/utility/system/Timer.h" 47 48 // ariba messages 49 #include "PeriodicBroadcastMessage.h" 50 51 // (ariba) link-local 52 #include "ariba/utility/transport/StreamTransport/StreamTransport.hpp" 53 54 // system 44 55 #include <map> 45 56 #include <string> 46 57 #include <ctime> 47 58 #include <iostream> 59 60 // boost 48 61 #include <boost/asio.hpp> 49 62 #include <boost/foreach.hpp> 50 63 #include <boost/thread/mutex.hpp> 51 64 #include <boost/thread/thread.hpp> 52 #include "ariba/utility/bootstrap/modules/BootstrapModule.h" 53 #include "ariba/utility/logging/Logging.h" 54 #include "ariba/utility/system/Timer.h" 55 #include "PeriodicBroadcastMessage.h" 56 57 //link-local 58 #include "ariba/utility/transport/tcpip/tcpip.hpp" 65 59 66 60 67 using std::map; … … 301 308 302 309 // include all link-local interfaces 303 vector<uint64_t> scope_ids = ariba::transport:: tcpip::get_interface_scope_ids();310 vector<uint64_t> scope_ids = ariba::transport::get_interface_scope_ids(); 304 311 305 312 BOOST_FOREACH ( uint64_t id, scope_ids ) -
source/ariba/utility/logging/Logging.h
r10700 r12060 97 97 98 98 static int __loglevel__ = 2; //default is info 99 // static int __loglevel__ = 1; // XXX use higher log level 99 100 100 101 #define logging_trace(x) { logging_stdout(x); } -
source/ariba/utility/messages.h
r3690 r12060 40 40 #define MESSAGES_H_ 41 41 42 // TODO wÃŒrde sagen das brauchen wir nicht mehr 43 // ---> ÃŒberall dieses include rausnehmen und ggf #include "messages/Message.h" einfÃŒgen.. 44 42 45 #include "messages/Message.h" 43 #include "messages/MessageSender.h" 44 #include "messages/MessageReceiver.h"45 #include "messages/MessageUtilities.h"46 #include "messages/MessageProvider.h" 47 #include "messages/TextMessage.h"46 //#include "messages/MessageSender.h" // TODO wird das noch genutzt..? Wenn nein, sollte es weg! 47 //#include "messages/MessageReceiver.h" 48 //#include "messages/MessageUtilities.h" 49 ////#include "messages/MessageProvider.h" // DEPRECATED 50 //#include "messages/TextMessage.h" 48 51 49 52 #endif /* MESSAGES_H_ */ -
source/ariba/utility/messages/Message.cpp
r8620 r12060 41 41 #include "ariba/utility/serialization/DataStream.hpp" 42 42 43 #include "ariba/utility/logging/Logging.h" 44 43 45 NAMESPACE_BEGIN 44 46 … … 80 82 } 81 83 84 85 reboost::message_t Message::wrap_up_for_sending() 86 { 87 assert( ! wrapped_up ); 88 wrapped_up = true; 89 90 //// Adapt to new message system //// 91 Data data = data_serialize(this, DEFAULT_V); 92 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 93 94 newstyle_payload.push_front(buf); 95 96 return newstyle_payload; 97 } 98 99 reboost::shared_buffer_t Message::serialize_into_shared_buffer() 100 { 101 assert ( newstyle_payload.length() == 0 ); 102 103 //// Adapt to new message system //// 104 Data data = data_serialize(this, DEFAULT_V); 105 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 106 107 return buf; 108 } 109 110 111 reboost::shared_buffer_t Message::deserialize_from_shared_buffer(reboost::shared_buffer_t buff) 112 { 113 // NOTE: legacy payload is not allowed when using shared buffers 114 this->legacy_payload_disabled = true; 115 116 assert( buff.size() > 0 ); 117 118 // const_cast is necessary here, but without legacy payload we should be save here (more or less) 119 Data dat(const_cast<uint8_t*>(buff.data()), buff.size() * 8); 120 121 size_t len = this->SERIALIZATION_METHOD_NAME(DESERIALIZE, dat) / 8; 122 123 // return remaining sub-buffer 124 return buff(len); 125 } 126 127 128 82 129 NAMESPACE_END 83 130 84 131 std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg ) { 85 132 using_serialization; 86 stream << "msg(type=" << typeid(msg).name() ;133 stream << "msg(type=" << typeid(msg).name() << ","; 87 134 stream << "len=" << (data_length(&msg)/8) << ","; 88 135 Data data = data_serialize(&msg); 89 stream << " ,data=" << data;136 stream << "data=" << data; 90 137 data.release(); 91 138 stream << ")"; 92 139 return stream; 93 140 } 94 -
source/ariba/utility/messages/Message.h
r6919 r12060 62 62 #include "ariba/utility/serialization.h" 63 63 64 // reboost messages 65 #include "ariba/utility/transport/messages/message.hpp" 66 67 64 68 std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg ); 65 69 … … 82 86 friend std::ostream& ::operator<<(std::ostream& stream, const ariba::utility::Message& msg ); 83 87 84 // root binary data85 shared_array<uint8_t> root;86 87 88 // payload 89 bool legacy_payload_disabled; 88 90 bool releasePayload; 89 91 Data payload; //< messages binary data 92 93 // XXX testing... 94 reboost::message_t newstyle_payload; 95 bool wrapped_up; 90 96 91 97 // addresses and control info … … 98 104 */ 99 105 inline Message() : 100 root(), releasePayload(true), payload(), srcAddr(NULL),destAddr(NULL) { 106 legacy_payload_disabled(false), releasePayload(true), payload(), 107 newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) { 101 108 } 102 109 … … 106 113 */ 107 114 explicit inline Message( const Data& data ) : 108 releasePayload(true), srcAddr(NULL),destAddr(NULL) { 115 legacy_payload_disabled(false), releasePayload(true), 116 newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) { // FIXME newstyle_payload..? 109 117 this->payload = data.clone(); 110 118 // this->root = shared_array<uint8_t>((uint8_t*)data.getBuffer()); … … 225 233 return decapsulate<T>(); 226 234 } 235 236 237 // XXX testing 238 void set_payload_message(reboost::message_t msg) 239 { 240 newstyle_payload = msg; 241 } 242 243 void append_buffer(reboost::shared_buffer_t buff) 244 { 245 newstyle_payload.push_back(buff); 246 } 247 248 249 // XXX testing... packs this message into the payload message (do not use twice!!) 250 virtual reboost::message_t wrap_up_for_sending(); 251 252 253 /** 254 * Uses the old serialization system to serialize itself into a (new style) shared buffer. 255 */ 256 virtual reboost::shared_buffer_t serialize_into_shared_buffer(); 257 258 /* 259 * XXX experimental 260 * 261 * Uses the old serialization system to deserialize itself out of a (new style) shared buffer. 262 * @return remaining sub-buffer (the "payload") 263 * 264 * Note: This is some kind of a hack! handle with care. 265 */ 266 virtual reboost::shared_buffer_t deserialize_from_shared_buffer(reboost::shared_buffer_t buff); 267 227 268 228 269 protected: … … 262 303 * @return A explicit payload serializer 263 304 */ 264 finline PayloadSerializer Payload( size_t length = ~0 ) { 305 finline PayloadSerializer Payload( size_t length = ~0 ) 306 { 307 // assert( ! legacy_payload_disabled ); // FIXME aktuell 308 265 309 return PayloadSerializer( this, length ); 266 310 } -
source/ariba/utility/messages/MessageProvider.cpp
r6919 r12060 37 37 // [License] 38 38 39 #include "MessageProvider.h"40 41 NAMESPACE_BEGIN42 43 MessageProvider::MessageProvider() {44 }45 46 MessageProvider::~MessageProvider() {47 }48 49 bool MessageProvider::sendMessageToReceivers( const Message* message ) {50 bool sent = false;51 for (size_t i=0; i<receivers.size(); i++)52 if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true;53 return sent;54 }55 56 void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) {57 receivers.push_back(receiver);58 }59 60 void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) {61 for (size_t i=0; i<receivers.size(); i++)62 if (receivers[i]==receiver) {63 receivers.erase( receivers.begin()+i );64 break;65 }66 }67 68 NAMESPACE_END39 //#include "MessageProvider.h" 40 // 41 //NAMESPACE_BEGIN 42 // 43 //MessageProvider::MessageProvider() { 44 //} 45 // 46 //MessageProvider::~MessageProvider() { 47 //} 48 // 49 //bool MessageProvider::sendMessageToReceivers( const Message* message ) { 50 // bool sent = false; 51 // for (size_t i=0; i<receivers.size(); i++) 52 // if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true; 53 // return sent; 54 //} 55 // 56 //void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) { 57 // receivers.push_back(receiver); 58 //} 59 // 60 //void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) { 61 // for (size_t i=0; i<receivers.size(); i++) 62 // if (receivers[i]==receiver) { 63 // receivers.erase( receivers.begin()+i ); 64 // break; 65 // } 66 //} 67 // 68 //NAMESPACE_END -
source/ariba/utility/messages/MessageProvider.h
r3690 r12060 1 // XXX DEPRECATED 2 1 3 // [License] 2 4 // The Ariba-Underlay Copyright … … 37 39 // [License] 38 40 41 // XXX DEPRECATED 42 39 43 #ifndef MESSAGEPROVIDER_H_ 40 44 #define MESSAGEPROVIDER_H_ 41 45 42 #include "_namespace.h"43 #include "MessageReceiver.h"44 #include "ariba/utility/types/LinkID.h"45 #include "ariba/utility/types/NodeID.h"46 #include <vector>47 48 using std::vector;49 using ariba::utility::LinkID;50 using ariba::utility::NodeID;51 52 NAMESPACE_BEGIN53 54 55 / **56 * This class defines an interface for message providers.57 * Implementing classes must allow receivers to register themselves.58 *59 * @author Sebastian Mies60 */61 class MessageProvider {62 private:63 vector<MessageReceiver*> receivers;64 65 protected:66 bool sendMessageToReceivers( const Message* message );67 68 public:69 /**70 * Constructor.71 */72 MessageProvider();73 74 /**75 * Destructor.76 */77 ~MessageProvider();78 79 /**80 * Adds a message receiver.81 *82 * @param receiver The receiver.83 */84 void addMessageReceiver( MessageReceiver* receiver );85 86 /**87 * Removes a message receiver.88 *89 * @param receiver The receiver.90 */91 void removeMessageReceiver( MessageReceiver* receiver );92 };93 94 NAMESPACE_END46 //#include "_namespace.h" 47 //#include "MessageReceiver.h" 48 //#include "ariba/utility/types/LinkID.h" 49 //#include "ariba/utility/types/NodeID.h" 50 //#include <vector> 51 // 52 //using std::vector; 53 //using ariba::utility::LinkID; 54 //using ariba::utility::NodeID; 55 // 56 //NAMESPACE_BEGIN 57 // 58 // 59 ///** 60 // * This class defines an interface for message providers. 61 // * Implementing classes must allow receivers to register themselves. 62 // * 63 // * @author Sebastian Mies 64 // */ 65 //class MessageProvider { 66 //private: 67 // vector<MessageReceiver*> receivers; 68 // 69 //protected: 70 // bool sendMessageToReceivers( const Message* message ); 71 // 72 //public: 73 // /** 74 // * Constructor. 75 // */ 76 // MessageProvider(); 77 // 78 // /** 79 // * Destructor. 80 // */ 81 // ~MessageProvider(); 82 // 83 // /** 84 // * Adds a message receiver. 85 // * 86 // * @param receiver The receiver. 87 // */ 88 // void addMessageReceiver( MessageReceiver* receiver ); 89 // 90 // /** 91 // * Removes a message receiver. 92 // * 93 // * @param receiver The receiver. 94 // */ 95 // void removeMessageReceiver( MessageReceiver* receiver ); 96 //}; 97 // 98 //NAMESPACE_END 95 99 96 100 #endif /* MESSAGEPROVIDER_H_ */ -
source/ariba/utility/messages/MessageReceiver.cpp
r3690 r12060 49 49 } 50 50 51 bool MessageReceiver::receiveMessage( const Message*message, const LinkID& link, const NodeID& node ) {52 //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl;53 return false;54 }51 //bool MessageReceiver::receiveMessage( reboost::shared_buffer_t message, const LinkID& link, const NodeID& node ) { 52 // //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl; 53 // return false; 54 //} 55 55 56 56 NAMESPACE_END -
source/ariba/utility/messages/MessageReceiver.h
r3690 r12060 40 40 #define MESSAGERECEIVER_H__ 41 41 42 #include "ariba/utility/messages/Message.h" 42 //#include "ariba/utility/messages/Message.h" 43 // reboost messages 44 #include "ariba/utility/transport/messages/message.hpp" 43 45 #include "ariba/utility/types/LinkID.h" 44 46 #include "ariba/utility/types/NodeID.h" … … 73 75 * @return True, when the message has been accepted. 74 76 */ 75 virtual bool receiveMessage( const Message* message, const LinkID& link, const NodeID& node ); 77 virtual bool receiveMessage( reboost::shared_buffer_t message, 78 const LinkID& link, 79 const NodeID& node, 80 bool bypass_overlay ) = 0; 76 81 }; 77 82 -
source/ariba/utility/misc/Demultiplexer.hpp
r3705 r12060 136 136 137 137 SERVICE_LISTENER_MAP_CITERATOR it = mapServiceListener.find( id ); 138 if( it == mapServiceListener.end() ) return NULL;138 if( it == mapServiceListener.end() ) return 0; 139 139 else return it->second; 140 140 } -
source/ariba/utility/system/StartupWrapper.cpp
r10700 r12060 36 36 // Telematics. 37 37 // [License] 38 39 // XXX NOTE: Use this class with caution! Config support is outdated. 38 40 39 41 #include "StartupWrapper.h" -
source/ariba/utility/system/StartupWrapper.h
r4483 r12060 36 36 // Telematics. 37 37 // [License] 38 39 // XXX NOTE: Use this class with caution! Config support is outdated. 38 40 39 41 #ifndef __STARTUP_WRAPPER_H -
source/ariba/utility/system/SystemQueue.cpp
r7533 r12060 69 69 } 70 70 71 // maps to function call internally to the Event-system 72 void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay) 73 { 74 // copy function object 75 boost::function0<void>* function_ptr = new boost::function0<void>(); 76 (*function_ptr) = function; 77 78 // schedule special call-event 79 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay ); 80 81 } 82 71 83 #ifdef UNDERLAY_OMNET 72 84 void SystemQueue::handleMessage(cMessage* msg){ -
source/ariba/utility/system/SystemQueue.h
r7468 r12060 60 60 #endif 61 61 62 #include <boost/function.hpp> 63 64 62 65 using std::vector; 63 66 using boost::posix_time::ptime; … … 108 111 */ 109 112 void scheduleEvent( const SystemEvent& event, uint32_t delay = 0 ); 113 114 /** 115 * This method schedules a function call in the SystemQueue. 116 * (Like scheduleEvent, but to be used with boost::bind.) 117 * 118 * @param function: The function to be called [void function()] 119 * @param The delay in milli-seconds 120 */ 121 void scheduleCall( const boost::function0<void>& function, uint32_t delay = 0 ); 110 122 111 123 /** … … 170 182 #endif 171 183 184 185 172 186 private: 173 187 … … 235 249 volatile bool systemQueueRunning; 236 250 #endif 237 251 252 253 private: 254 /** 255 * This inner class handles the function-call events. 256 * @see SystemQueue::scheduleCall 257 */ 258 class FunctionCaller : public SystemEventListener 259 { 260 void handleSystemEvent(const SystemEvent& event) 261 { 262 boost::function0<void>* function_ptr = event.getData< boost::function0<void> >(); 263 (*function_ptr)(); 264 delete function_ptr; 265 } 266 }; 267 268 FunctionCaller internal_function_caller; 238 269 }; // class SystemQueue 239 270 -
source/ariba/utility/transport
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/transport/CMakeLists.txt
r10700 r12060 38 38 39 39 add_headers( 40 test_transport.hpp41 transport_connection.hpp42 transport.hpp43 transport_listener.hpp44 40 transport_peer.cpp 45 41 transport_peer.hpp 46 transport_protocol.hpp47 42 ) 48 43 49 add_subdir_sources(asio messages rfcomm tcpip) 44 add_subdir_sources(asio messages rfcomm StreamTransport) 45 -
source/ariba/utility/transport/messages/message.cpp
r10653 r12060 24 24 os << "message({size=" << m.size() << ",buffers=" << (int) m.length() 25 25 << ",hash=" << m.hash() << "},"; 26 m. foreach(ts);26 m.msg_foreach(ts); 27 27 os << ")"; 28 28 return os; -
source/ariba/utility/transport/messages/message.hpp
r10653 r12060 17 17 18 18 /// message size type 19 typedef signed char mlength_t; 19 //typedef signed char mlength_t; // <--- don't do this!! 20 //typedef size_t mlength_t; 21 typedef int mlength_t; // signed int seems necessary 20 22 21 23 /// maximum number of buffers per message (default is 8) 22 24 const mlength_t message_max_buffers = (1L << 3); 25 //const mlength_t message_max_buffers = (1L << 4); 23 26 24 27 //! A Copy-on-Write Message with Shared Buffers. … … 70 73 /// Copy message 71 74 inline message_t(const message_t& msg) : 72 imsg(msg.imsg) { 73 imsg->owner = NULL; 75 imsg(msg.imsg) 76 { 77 if ( imsg ) 78 imsg->owner = NULL; 74 79 } 75 80 … … 142 147 /// Returns the number of buffers inside this message. 143 148 inline mlength_t length() const { 149 if ( ! imsg ) 150 return 0; 151 144 152 return (imsg->length); 145 153 } … … 167 175 /// Iterates over a partial set of buffers. 168 176 template<typename T> 169 inline void foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {177 inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const { 170 178 T op = work; 171 179 if (size_ == 0) size_ = size() - index_; … … 192 200 inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const { 193 201 struct read_buffer rb = { mem }; 194 foreach(rb, idx, size_);202 msg_foreach(rb, idx, size_); 195 203 } 196 204 … … 198 206 inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) { 199 207 struct write_buffer wb = { mem }; 200 foreach(wb, idx, size_);208 msg_foreach(wb, idx, size_); 201 209 } 202 210 … … 227 235 message_t m; 228 236 struct sub_message sm = { &m }; 229 foreach(sm, index, size);237 msg_foreach(sm, index, size); 230 238 return m; 231 239 } -
source/ariba/utility/transport/messages/shared_buffer.hpp
r10700 r12060 9 9 10 10 #include <cstring> 11 #include <string> 11 12 #include <boost/shared_ptr.hpp> 12 13 … … 18 19 #include "buffer.hpp" 19 20 21 #include <stdexcept> 22 20 23 namespace reboost { 24 25 class illegal_sub_buffer: public std::runtime_error 26 { 27 public: 28 /** Takes a character string describing the error. */ 29 explicit illegal_sub_buffer(const std::string& __arg) : 30 std::runtime_error(__arg) 31 { 32 } 33 34 virtual ~illegal_sub_buffer() throw() {} 35 }; 21 36 22 37 /** … … 104 119 parent(new deleteable_buffer(buffer, size)) 105 120 { 121 } 122 123 // /// XXX debug... copy! 124 // /// create shared buffer from buffer 125 // inline shared_buffer_t(const char* buffer, bsize_t size) : 126 // buffer_t(), parent(new deleteable_buffer(size)) { 106 127 // memcpy(parent->mutable_data(), buffer, parent->size()); 107 // data(parent->mutable_data()); 108 // this->size(parent->size()); 109 } 128 // data(parent->mutable_data()); this->size(parent->size()); 129 // } 110 130 111 131 /// clone data from a normal buffer … … 129 149 130 150 /// return sub-buffer. 131 inline self operator()(bsize_t index, bsize_t size = 0) const { 151 inline self operator()(bsize_t index, bsize_t size = 0) const 152 { 153 // special cases 154 if ( index + size > size_ ) 155 { 156 // empty sub-buffer 157 if ( index == size_ ) 158 { 159 self n; 160 return n; 161 } 162 163 // ERROR: index out of bounds 164 throw illegal_sub_buffer("Index or size out of bounds in shared_buffer"); 165 } 166 167 // regular case 132 168 self n(*this); 133 169 n.data_ += index; -
source/ariba/utility/transport/rfcomm/CMakeLists.txt
r10700 r12060 40 40 bluetooth_endpoint.hpp 41 41 bluetooth_rfcomm.hpp 42 rfcomm_transport.hpp43 42 ) 44 43 45 add_sources(rfcomm_transport.cpp) 44 #add_sources() 45 -
source/ariba/utility/transport/transport_peer.cpp
r10700 r12060 1 2 #include "ariba/config.h"3 1 #include "transport_peer.hpp" 4 #include "transport.hpp" 5 #include <boost/asio/ip/tcp.hpp> 2 3 // ariba 4 #include "StreamTransport/StreamTransport.hpp" 5 #include "ariba/utility/addressing2/tcpip_endpoint.hpp" 6 7 // boost 6 8 #include <boost/asio/error.hpp> 7 9 #include <boost/foreach.hpp> 8 9 #ifdef ECLIPSE_PARSER10 #define foreach(a, b) for(a : b)11 #else12 #define foreach(a, b) BOOST_FOREACH(a, b)13 #endif14 10 15 11 // namespace ariba::transport … … 17 13 namespace transport { 18 14 19 using namespace a riba::addressing;15 using namespace addressing2; 20 16 using boost::asio::ip::tcp; 21 17 … … 26 22 use_logging_cpp(transport_peer); 27 23 28 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) { 29 30 // setup tcp transports 31 foreach(tcp_port_address port, local.tcp) { 24 transport_peer::transport_peer() : 25 local(new addressing2::endpoint_set()) 26 { 27 } 28 29 EndpointSetPtr transport_peer::add_listenOn_endpoints(EndpointSetPtr endpoints) 30 { 31 // TCP Endpoints 32 BOOST_FOREACH( shared_ptr<tcpip_endpoint> endp, endpoints->get_tcpip_endpoints() ) 33 { 34 // automatic port detection 35 bool port_detection = false; 36 uint16_t try_port = 41322; 32 37 33 if (local.ip.size() > 0) { 34 foreach(ip_address ip_addr, local.ip) { 35 36 tcp::endpoint endp(ip_addr.asio(), port.asio()); 37 create_service(endp); 38 } 39 } else { 40 tcp::endpoint endp_v6(tcp::v6(), port.asio()); 41 tcp::endpoint endp_v4(tcp::v4(), port.asio()); 42 43 create_service(endp_v6); 44 create_service(endp_v4); 38 tcp::endpoint asio_endp = endp->to_asio(); 39 if ( asio_endp.port() == 0 ) 40 { 41 port_detection = true; 45 42 } 46 43 47 } 48 44 45 // create new server socket 46 do 47 { 48 try 49 { 50 // automatic port detection 51 if ( port_detection ) 52 { 53 asio_endp.port(try_port); 54 endp = tcpip_endpoint::create_TcpIP_Endpoint(asio_endp); 55 } 56 57 TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp->to_asio())); 58 transport_streams.push_back(tmp_ptr); 59 logging_info("Listening on IP/TCP " << endp->to_string()); 60 61 local->add_endpoint(endp); 62 port_detection = false; 63 } 64 65 catch (boost::system::system_error& e) 66 { 67 // address in use 68 if (e.code() == boost::asio::error::address_in_use) 69 { 70 // BRANCH: automatic port detection 71 if ( port_detection ) 72 { 73 // give up ? 74 if ( try_port > 41422 ) 75 { 76 logging_warn("[WARN] Unable to find free port. Giving up. :-( Last try was: " 77 << endp->to_string() << ". Endpoint will be ignored!"); 78 79 port_detection = false; 80 } 81 else 82 { 83 // try next port 84 try_port++; 85 } 86 } 87 // BRANCH: explicit given port --> error 88 else 89 { 90 logging_warn("[WARN] Address already in use: " 91 << endp->to_string() << ". Endpoint will be ignored!"); 92 } 93 } 94 95 // Rethrow 96 else 97 { 98 throw; 99 } 100 } 101 } while ( port_detection ); 102 } 103 104 // TODO Bluetooth Endpoints 49 105 #ifdef HAVE_LIBBLUETOOTH 50 foreach(rfcomm_channel_address channel, local.rfcomm) {51 if (local.bluetooth.size() > 0) {52 foreach(mac_address mac, local.bluetooth) {53 rfcomm::endpoint endp(mac.bluetooth(), channel.value());54 create_service(endp);55 }56 } else {57 rfcomm::endpoint endp(channel.value());58 create_service(endp);59 }60 }106 // foreach(rfcomm_channel_address channel, local.rfcomm) { 107 // if (local.bluetooth.size() > 0) { 108 // foreach(mac_address mac, local.bluetooth) { 109 // rfcomm::endpoint endp(mac.bluetooth(), channel.value()); 110 // create_service(endp); 111 // } 112 // } else { 113 // rfcomm::endpoint endp(channel.value()); 114 // create_service(endp); 115 // } 116 // } 61 117 #endif 62 } 63 64 void transport_peer::create_service(tcp::endpoint endp) { 65 try { 66 TcpIpPtr tmp_ptr(new tcpip(endp)); 67 tcps.push_back(tmp_ptr); 68 logging_info("Listening on IP/TCP " << endp); 69 70 } catch (boost::system::system_error& e) { 71 if (e.code() == boost::asio::error::address_in_use) { 72 logging_warn("[WARN] Address already in use: " 73 << endp << ". Endpoint will be ignored!"); 74 } else { 75 // Rethrow 76 throw; 77 } 78 } 79 } 118 119 return local; 120 } 121 122 //void transport_peer::create_service(tcp::endpoint endp) { 123 // try { 124 // TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp)); 125 // tcps.push_back(tmp_ptr); 126 // logging_info("Listening on IP/TCP " << endp); 127 // 128 // } catch (boost::system::system_error& e) { 129 // if (e.code() == boost::asio::error::address_in_use) { 130 // logging_warn("[WARN] Address already in use: " 131 // << endp << ". Endpoint will be ignored!"); 132 // } else { 133 // // Rethrow 134 // throw; 135 // } 136 // } 137 //} 80 138 81 139 #ifdef HAVE_LIBBLUETOOTH 82 void transport_peer::create_service(rfcomm::endpoint endp) {83 try {84 rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));85 rfcomms.push_back(tmp_ptr);86 logging_info("Listening on bluetooth/RFCOMM " << endp);87 88 } catch (boost::system::system_error& e) {89 if (e.code() == boost::asio::error::address_in_use) {90 logging_warn("[WARN] Address already in use: "91 << endp << ". Endpoint will be ignored!");92 } else {93 // Rethrow94 throw;95 }96 }97 }140 //void transport_peer::create_service(rfcomm::endpoint endp) { 141 // try { 142 // TransportProtocolPtr tmp_ptr(new StreamTransport<rfcomm>(endp)); 143 // rfcomms.push_back(tmp_ptr); 144 // logging_info("Listening on bluetooth/RFCOMM " << endp); 145 // 146 // } catch (boost::system::system_error& e) { 147 // if (e.code() == boost::asio::error::address_in_use) { 148 // logging_warn("[WARN] Address already in use: " 149 // << endp << ". Endpoint will be ignored!"); 150 // } else { 151 // // Rethrow 152 // throw; 153 // } 154 // } 155 //} 98 156 #endif 99 157 … … 101 159 } 102 160 103 void transport_peer::start() { 104 foreach(TcpIpPtr tcp, tcps) { 105 tcp->start(); 106 } 107 108 #ifdef HAVE_LIBBLUETOOTH 109 foreach(rfcomm_transport::sptr x, rfcomms) { 110 x->start(); 111 } 112 #endif 113 } 114 115 void transport_peer::stop() { 116 foreach(TcpIpPtr tcp, tcps) { 117 tcp->stop(); 118 } 119 120 #ifdef HAVE_LIBBLUETOOTH 121 foreach(rfcomm_transport::sptr x, rfcomms) { 122 x->stop(); 123 } 124 #endif 161 void transport_peer::start() 162 { 163 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 164 { 165 stream->start(); 166 } 167 } 168 169 void transport_peer::stop() 170 { 171 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 172 { 173 stream->stop(); 174 } 125 175 } 126 176 127 177 128 178 void transport_peer::send( 129 const endpoint_set&endpoints,179 const const_EndpointSetPtr endpoints, 130 180 reboost::message_t message, 131 181 uint8_t priority) 132 182 { 133 foreach(TcpIpPtr tcp, tcps) { 134 tcp->send(endpoints, message, priority); 135 } 136 137 #ifdef HAVE_LIBBLUETOOTH 138 foreach(rfcomm_transport::sptr x, rfcomms) { 139 x->send(endpoints, message, priority); 140 } 141 #endif 142 } 143 144 void transport_peer::terminate( const address_v* remote ) { 145 if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung 146 { 147 foreach(TcpIpPtr tcp, tcps) { 148 tcp->terminate(remote); 149 } 150 } 151 #ifdef HAVE_LIBBLUETOOTH 152 if (remote->instanceof<rfcomm_endpoint>()) { 153 foreach(rfcomm_transport::sptr x, rfcomms) { 154 x->terminate(remote); 155 } 156 } 157 #endif 158 } 159 160 void transport_peer::register_listener( transport_listener* listener ) { 161 foreach(TcpIpPtr tcp, tcps) { 162 tcp->register_listener(listener); 163 } 164 165 #ifdef HAVE_LIBBLUETOOTH 166 foreach(rfcomm_transport::sptr x, rfcomms) { 167 x->register_listener(listener); 168 } 169 #endif 183 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 184 { 185 stream->send(endpoints, message, priority); 186 } 187 } 188 189 // XXX DEPRECATED 190 //void transport_peer::terminate( const address_v* remote ) { 191 // if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung 192 // { 193 // foreach(TransportProtocolPtr tcp, tcps) { 194 // tcp->terminate(remote); 195 // } 196 // } 197 //#ifdef HAVE_LIBBLUETOOTH 198 // if (remote->instanceof<rfcomm_endpoint>()) { 199 // foreach(TransportProtocolPtr x, rfcomms) { 200 // x->terminate(remote); 201 // } 202 // } 203 //#endif 204 //} 205 206 void transport_peer::register_listener( transport_listener* listener ) 207 { 208 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 209 { 210 stream->register_listener(listener); 211 } 170 212 } 171 213 -
source/ariba/utility/transport/transport_peer.hpp
r10700 r12060 2 2 #define TRANSPORT_PEER_HPP_ 3 3 4 // ariba 4 5 #include "ariba/config.h" 5 6 #include "ariba/utility/logging/Logging.h" 6 #include "transport_protocol.hpp" 7 #include "ariba/utility/addressing/endpoint_set.hpp" 7 #include "ariba/utility/addressing2/endpoint_set.hpp" 8 9 // ariba interfaces 10 #include "interfaces/transport_protocol.hpp" 11 12 // boost 8 13 #include <boost/shared_ptr.hpp> 9 #include "rfcomm/bluetooth_rfcomm.hpp" 14 15 // boost-adaption 16 //#include "rfcomm/bluetooth_rfcomm.hpp" 10 17 11 18 … … 14 21 namespace transport { 15 22 16 using namespace ariba::addressing;17 18 class tcpip;19 20 #ifdef HAVE_LIBBLUETOOTH21 class rfcomm_transport;22 #endif23 24 23 /** 25 * TODO: Doc 24 * This class allocates implementations of various transport 25 * protocols and can send messages to an entire set of endpoints 26 26 * 27 * @author Sebastian Mies <mies@tm.uka.de> 27 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock 28 28 */ 29 /// this transport peer allocates implementations of various transport 30 /// protocols and can send messages to an entire set of endpoints 31 class transport_peer : public transport_protocol{29 class transport_peer : 30 public transport_protocol 31 { 32 32 use_logging_h(transport_peer); 33 typedef boost::shared_ptr<transport_protocol> TransportProtocolPtr; 34 33 35 public: 34 transport_peer( endpoint_set& local_set ); 36 transport_peer(); 37 38 /** 39 * Adds endpoints on which ariba should listen ("server"-sockets) 40 * 41 * @return An endpoint_set holding all active endpoints ariba is listening on. 42 */ 43 addressing2::EndpointSetPtr add_listenOn_endpoints(addressing2::EndpointSetPtr endpoints); 44 35 45 virtual ~transport_peer(); 36 46 virtual void start(); … … 38 48 39 49 virtual void send( 40 const endpoint_set&endpoints,50 const addressing2::const_EndpointSetPtr endpoints, 41 51 reboost::message_t message, 42 uint8_t priority = 0); 43 44 /// @deprecated: Use terminate() from transport_connection instead 45 virtual void terminate( const address_v* remote ); 46 52 uint8_t priority = system_priority::OVERLAY); 53 47 54 virtual void register_listener( transport_listener* listener ); 48 55 56 49 57 private: 50 void create_service(tcp::endpoint endp); 51 #ifdef HAVE_LIBBLUETOOTH 52 void create_service(boost::asio::bluetooth::rfcomm::endpoint endp); 53 #endif 54 55 endpoint_set& local; 56 std::vector< boost::shared_ptr<tcpip> > tcps; 57 #ifdef HAVE_LIBBLUETOOTH 58 std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms; 59 #endif 58 addressing2::EndpointSetPtr local; 59 std::vector<TransportProtocolPtr> transport_streams; 60 60 }; 61 61 -
source/ariba/utility/types/Identifier.h
r4625 r12060 50 50 #include "ariba/utility/serialization.h" 51 51 52 // XXX EXPERIMENTAL 53 #include "ariba/utility/transport/messages/shared_buffer.hpp" 54 52 55 /**< maximum length of the key */ 53 56 #define MAX_KEYLENGTH 192 … … 69 72 use_logging_h( Identifier ); 70 73 public: 74 75 // XXX EXPERIMENTAL 76 reboost::shared_buffer_t serialize() const 77 { 78 Data data = data_serialize(this, DEFAULT_V); 79 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 80 81 return buf; 82 } 83 84 reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff) 85 { 86 Data dat(const_cast<uint8_t*>(buff.data()), buff.size() * 8); 87 88 size_t len = this->SERIALIZATION_METHOD_NAME(DESERIALIZE, dat) / 8; 89 90 // return remaining sub-buffer 91 return buff(len); 92 } 93 71 94 72 95 //------------------------------------------------------------------------- -
source/ariba/utility/types/OverlayParameterSet.h
r4625 r12060 53 53 54 54 typedef enum _OverlayStructure { 55 OverlayStructureOneHop = 0, 55 OverlayStructureOneHop = 0, // DEPRECATED, DO NOT USE 56 56 OverlayStructureChord = 1, 57 57 } OverlayStructure; -
source/ariba/utility/types/ServiceID.cpp
r3690 r12060 76 76 } 77 77 78 string ServiceID::toString() const { 78 string ServiceID::toString() const 79 { 80 if ( *this == ServiceID::UNSPECIFIED ) 81 return "UNSPEC"; 82 79 83 return ariba::utility::Helper::ultos( id ); 80 84 } -
source/services/ariba_dht/Dht.cpp
r10700 r12060 48 48 void Dht::get(const std::string& key) 49 49 { 50 DhtMessage msg(DhtMessage::DhtGet, key );50 DhtMessage msg(DhtMessage::DhtGet, key, node->getNodeId()); 51 51 52 52 handle_dht_message(msg, NodeID::UNSPECIFIED); … … 155 155 case DhtMessage::DhtGet: 156 156 { 157 answer_dht_request(message.getKey(), source);157 answer_dht_request(message.getKey(), message.getSourceNode()); 158 158 159 159 break; … … 167 167 message.getValues(), 168 168 message.getTTL()); 169 answer_dht_request(message.getKey(), source);169 answer_dht_request(message.getKey(), message.getSourceNode()); 170 170 171 171 break; -
source/services/ariba_dht/Dht.h
r10700 r12060 24 24 using ariba::utility::SystemEventType; 25 25 using ariba::utility::SystemEventListener; 26 27 using ariba::utility::NodeID; 28 using ariba::utility::LinkID; 29 26 30 27 31 // Forward declarations to avoid adding messages/*.h to the public interface -
source/services/ariba_dht/messages/DhtMessage.cpp
r10700 r12060 10 10 DhtMessage::DhtMessage() : 11 11 ttl( 0 ), 12 replace( false ) 12 replace( false ), 13 sourceNode(NodeID::UNSPECIFIED) 13 14 {} 14 15 15 DhtMessage::DhtMessage( DhtMessageType type, const std::string& key ) :16 DhtMessage::DhtMessage( DhtMessageType type, const std::string& key, const NodeID& sourceNodeID ) : 16 17 type( static_cast<uint8_t>(type) ), 17 18 ttl( 0 ), 18 19 replace( false ), 19 key( key ) 20 key( key ), 21 sourceNode(sourceNodeID) 22 20 23 {} 21 24 22 25 DhtMessage::DhtMessage( DhtMessageType type, const std::string& key, 23 const std::string& value, uint16_t ttl ) :26 const std::string& value, uint16_t ttl, const NodeID& sourceNodeID ) : 24 27 type( static_cast<uint8_t>(type) ), 25 28 ttl( ttl ), 26 29 replace( false ), 27 30 key( key ), 28 values(1, value) 31 values(1, value), 32 sourceNode(sourceNodeID) 29 33 {} 30 34 31 35 DhtMessage::DhtMessage( DhtMessageType type, const std::string& key, 32 const vector<string>& values, uint16_t ttl ) :36 const vector<string>& values, uint16_t ttl, const NodeID& sourceNodeID ) : 33 37 type( static_cast<uint8_t>(type) ), 34 38 ttl( ttl ), 35 39 replace( false ), 36 key( key ) 40 key( key ), 41 sourceNode(sourceNodeID) 37 42 { 38 43 // preallocate enough room so we don't need to copy a lot … … 46 51 } 47 52 53 string DhtMessage::DhtMessageTypeToString(DhtMessageType type) { 54 string temp; 55 switch (type) 56 { 57 case DhtMessage::DhtInvalid: 58 { 59 temp = "DhtInvalid"; 60 break; 61 } 62 63 case DhtMessage::DhtGet: 64 { 65 temp = "DhtGet"; 66 break; 67 } 68 69 case DhtMessage::DhtPut: 70 { 71 temp = "DhtPut"; 72 break; 73 } 74 75 case DhtMessage::DhtPutAndGet: 76 { 77 temp = "DhtPutAndGet"; 78 break; 79 } 80 81 case DhtMessage::DhtRemove: 82 { 83 temp = "DhtRemove"; 84 break; 85 } 86 87 case DhtMessage::DhtRepublish: 88 { 89 temp = "DhtRepublish"; 90 break; 91 } 92 93 case DhtMessage::DhtAnswer: 94 { 95 temp = "DhtAnswer"; 96 break; 97 } 98 99 case DhtMessage::DhtReplica: 100 { 101 temp = "DhtReplica"; 102 break; 103 } 104 } 105 106 return temp; 107 } 108 48 109 }} -
source/services/ariba_dht/messages/DhtMessage.h
r10700 r12060 3 3 4 4 #include "ariba/utility/messages.h" 5 #include "ariba/utility/types/NodeID.h" 5 6 #include "ariba/utility/serialization.h" 6 7 #include "ariba/Name.h" … … 10 11 11 12 using ariba::utility::Message; 13 using ariba::utility::NodeID; 12 14 using_serialization; 13 15 … … 21 23 DhtRemove = 4, 22 24 DhtRepublish = 5, 23 DhtAnswer = 8 25 DhtAnswer = 8, 26 DhtReplica = 9 24 27 } DhtMessageType; 25 28 26 29 DhtMessage(); 27 DhtMessage( DhtMessageType type, const std::string& key 30 DhtMessage( DhtMessageType type, const std::string& key, const NodeID& sourceNodeID = NodeID::UNSPECIFIED); 28 31 DhtMessage( DhtMessageType type, const std::string& key, 29 const std::string& value, uint16_t ttl = 0 );32 const std::string& value, uint16_t ttl = 0, const NodeID& sourceNodeID = NodeID::UNSPECIFIED ); 30 33 31 34 DhtMessage( DhtMessageType type, const std::string& key, 32 const vector<std::string>& values, uint16_t ttl = 0 );35 const vector<std::string>& values, uint16_t ttl = 0, const NodeID& sourceNodeID = NodeID::UNSPECIFIED ); 33 36 34 37 virtual ~DhtMessage(); … … 77 80 } 78 81 82 NodeID getSourceNode() const { 83 return sourceNode; 84 } 85 79 86 bool doReplace() const { 80 87 return replace; 81 88 } 82 89 90 static string DhtMessageTypeToString(DhtMessageType type); 83 91 84 pr ivate:92 protected: 85 93 uint8_t type; 86 94 uint16_t ttl; … … 88 96 std::string key; 89 97 vector<std::string> values; 98 NodeID sourceNode; 99 100 private: 90 101 }; 91 102 … … 100 111 // key serialization 101 112 X && T(key); 113 114 X && &sourceNode; 102 115 103 116 // store number of values
Note:
See TracChangeset
for help on using the changeset viewer.