Changeset 10653
- Timestamp:
- Jul 25, 2012, 11:41:36 AM (12 years ago)
- Files:
-
- 27 added
- 10 deleted
- 35 edited
Legend:
- Unmodified
- Added
- Removed
-
/
- Property svn:mergeinfo changed (with no actual effect on merging)
-
Makefile.am
r3744 r10653 1 1 ACLOCAL_AMFLAGS = -I m4 2 2 SUBDIRS = docu source sample 3 EXTRA_DIST = etc/pingpong/* etc/patch/* INSTALL LICENSE README bootstrap bootstrap_libs 3 EXTRA_DIST = etc/pingpong/* \ 4 etc/patch/* \ 5 INSTALL \ 6 LICENSE \ 7 README \ 8 bootstrap \ 9 bootstrap_libs \ 10 docu/doxygen/Doxyfile 4 11 5 12 # hook to remove all .svn files before rolling the tarball -
configure.ac
r10092 r10653 1 AC_INIT([ariba], [0. 7.1], [http://www.ariba-underlay.org])1 AC_INIT([ariba], [0.8.1], [http://www.ariba-underlay.org]) 2 2 AM_INIT_AUTOMAKE([-Wall foreign]) 3 3 AC_CONFIG_SRCDIR([source/ariba/ariba.h]) … … 58 58 sample/Makefile 59 59 sample/pingpong/Makefile 60 sample/testdht/Makefile 60 source/services/Makefile 61 source/services/dht/Makefile 61 62 docu/Makefile 62 63 docu/doxygen/Makefile -
docu/doxygen/Doxyfile
r9748 r10653 6 6 DOXYFILE_ENCODING = UTF-8 7 7 PROJECT_NAME = Ariba 8 PROJECT_NUMBER = 0. 7.08 PROJECT_NUMBER = 0.8.1 9 9 OUTPUT_DIRECTORY = ./ 10 10 CREATE_SUBDIRS = NO -
sample/Makefile.am
r6760 r10653 1 SUBDIRS = pingpong testdht 1 #SUBDIRS = pingpong testdht 2 SUBDIRS = pingpong -
source/Makefile.am
r10652 r10653 1 SUBDIRS = ariba 1 SUBDIRS = ariba services -
source/ariba/Makefile.am
r7744 r10653 18 18 # project version number!! 19 19 20 libariba_la_LDFLAGS = -version-info 0:0:020 libariba_la_LDFLAGS = -version-info 1:0:0 21 21 22 22 # compiler flags ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ … … 144 144 overlay/messages/JoinReply.cpp \ 145 145 overlay/messages/JoinRequest.cpp \ 146 overlay/messages/DHTMessage.cpp \147 146 overlay/messages/OverlayMsg.cpp 148 147 … … 150 149 overlay/messages/JoinReply.h \ 151 150 overlay/messages/JoinRequest.h \ 152 overlay/messages/DHTMessage.h\153 151 overlay/messages/OverlayMsg.h 154 152 … … 278 276 libariba_la_SOURCES += \ 279 277 utility/transport/tcpip/tcpip.cpp \ 280 utility/transport/tcpip/protlib/timer_module.cpp \281 utility/transport/tcpip/protlib/setuid.cpp \282 utility/transport/tcpip/protlib/queuemanager.cpp \283 utility/transport/tcpip/protlib/messages.cpp \284 utility/transport/tcpip/protlib/fqueue.cpp \285 utility/transport/tcpip/protlib/fastqueue.c \286 utility/transport/tcpip/protlib/eclock_gettime.c \287 utility/transport/tcpip/protlib/tp_over_udp.cpp \288 utility/transport/tcpip/protlib/connectionmap_uds.cpp \289 utility/transport/tcpip/protlib/network_message.cpp \290 utility/transport/tcpip/protlib/threadsafe_db.cpp \291 utility/transport/tcpip/protlib/timer.cpp \292 utility/transport/tcpip/protlib/address.cpp \293 utility/transport/tcpip/protlib/connectionmap.cpp \294 utility/transport/tcpip/protlib/tp.cpp \295 utility/transport/tcpip/protlib/tp_over_tcp.cpp \296 utility/transport/tcpip/protlib/configuration.cpp \297 utility/transport/tcpip/protlib/ie.cpp \298 utility/transport/tcpip/protlib/threads.cpp \299 utility/transport/tcpip/protlib/logfile.cpp \300 278 utility/transport/transport_peer.cpp \ 301 utility/transport/rfcomm/rfcomm.cpp \ 302 utility/transport/asio/asio_io_service.cpp 279 utility/transport/rfcomm/rfcomm_transport.cpp \ 280 utility/transport/asio/unique_io_service.cpp \ 281 utility/transport/messages/buffer.cpp \ 282 utility/transport/messages/message.cpp \ 283 utility/transport/messages/shared_buffer.cpp 303 284 304 285 nobase_libariba_la_HEADERS += \ 305 286 utility/transport/test_transport.hpp \ 306 287 utility/transport/tcpip/tcpip.hpp \ 288 utility/transport/transport_connection.hpp \ 289 utility/transport/transport_listener.hpp \ 307 290 utility/transport/transport_peer.hpp \ 308 291 utility/transport/transport_protocol.hpp \ 309 utility/transport/rfcomm/rfcomm.hpp \ 292 utility/transport/rfcomm/rfcomm_transport.hpp \ 293 utility/transport/rfcomm/bluetooth_endpoint.hpp \ 294 utility/transport/rfcomm/bluetooth_rfcomm.hpp \ 310 295 utility/transport/transport.hpp \ 311 utility/transport/asio/bluetooth_endpoint.hpp \ 312 utility/transport/asio/rfcomm.hpp \ 313 utility/transport/transport_listener.hpp \ 314 utility/transport/asio/asio_io_service.h \ 315 utility/transport/tcpip/protlib/threadsafe_db.h \ 316 utility/transport/tcpip/protlib/configuration.h \ 317 utility/transport/tcpip/protlib/ie.h \ 318 utility/transport/tcpip/protlib/llhashers.h \ 319 utility/transport/tcpip/protlib/fqueue.h \ 320 utility/transport/tcpip/protlib/assocdata_uds.h \ 321 utility/transport/tcpip/protlib/address.h \ 322 utility/transport/tcpip/protlib/logfile.h \ 323 utility/transport/tcpip/protlib/timer.h \ 324 utility/transport/tcpip/protlib/queuemanager.h \ 325 utility/transport/tcpip/protlib/messages.h \ 326 utility/transport/tcpip/protlib/assocdata.h \ 327 utility/transport/tcpip/protlib/protlib_types.h \ 328 utility/transport/tcpip/protlib/tp_over_tcp.h \ 329 utility/transport/tcpip/protlib/tp_over_udp.h \ 330 utility/transport/tcpip/protlib/tp.h \ 331 utility/transport/tcpip/protlib/threads.h \ 332 utility/transport/tcpip/protlib/connectionmap.h \ 333 utility/transport/tcpip/protlib/timer_module.h \ 334 utility/transport/tcpip/protlib/fastqueue.h \ 335 utility/transport/tcpip/protlib/tperror.h \ 336 utility/transport/tcpip/protlib/network_message.h \ 337 utility/transport/tcpip/protlib/setuid.h \ 338 utility/transport/tcpip/protlib/cleanuphandler.h \ 339 utility/transport/tcpip/protlib/connectionmap_uds.h 296 utility/transport/asio/unique_io_service.h \ 297 utility/transport/messages/buffer.hpp \ 298 utility/transport/messages/buffers.hpp \ 299 utility/transport/messages/message.hpp \ 300 utility/transport/messages/shared_buffer.hpp 340 301 341 302 #------------> utility :: messages -
source/ariba/Node.cpp
r7532 r10653 177 177 } 178 178 179 NodeID Node::sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 180 const LinkProperties& req) { 181 182 return base_overlay->sendMessageCloserToNodeID((Message*) msg, nid, sid); 183 } 184 185 179 186 seqnum_t Node::sendMessage(const DataMessage& msg, const LinkID& lnk) { 180 187 return base_overlay->sendMessage((Message*) msg, lnk); … … 209 216 } 210 217 211 // service directory212 213 void Node::put( const Data& key, const Data& value, uint16_t ttl, bool replace ) {214 base_overlay->dhtPut(key,value,ttl,replace);215 }216 217 void Node::get( const Data& key, const ServiceID& sid ) {218 base_overlay->dhtGet(key,sid);219 }220 221 218 // @see Module.h 222 219 string Node::getName() const { -
source/ariba/Node.h
r9684 r10653 242 242 243 243 /** 244 * like the above function, but sends the message to the closest directly known node 245 * to the specified address 246 */ 247 NodeID sendMessageCloserToNodeID(const DataMessage& msg, const NodeID& nid, const ServiceID& sid, 248 const LinkProperties& req = LinkProperties::DEFAULT); 249 250 /** 244 251 * Sends a message via an established link. If reliable transport was 245 252 * selected, the method returns a sequence number and a communication event … … 281 288 */ 282 289 bool unbind(CommunicationListener* listener, const ServiceID& sid); 283 284 /**285 * Adds a key value pair to the DHT286 *287 * @param key The key data288 * @param value The value data289 * @param ttl The time to live in seconds290 */291 void put( const Data& key, const Data& value, uint16_t ttl, bool replace = false);292 293 /**294 * Queries for values stored in the DHT. Fires an communication event when295 * values arrive.296 *297 * @param key The key data298 * @param sid The service that is requesting the values299 */300 void get( const Data& key, const ServiceID& sid );301 302 290 303 291 //------------------------------------------------------------------------- -
source/ariba/SideportListener.h
r9684 r10653 47 47 #include "CommunicationListener.h" 48 48 49 using std::cout;50 49 using std::map; 51 50 using std::vector; -
source/ariba/communication/BaseCommunication.cpp
r9322 r10653 41 41 #include "networkinfo/AddressDiscovery.h" 42 42 #include "ariba/utility/types/PeerID.h" 43 #include <boost/function.hpp> 43 44 44 45 #ifdef UNDERLAY_OMNET … … 283 284 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent ); 284 285 285 class DispatchMsg {286 public:287 DispatchMsg() : local(NULL), remote(NULL), message(NULL) {}288 address_v* local;289 address_v* remote;290 Message* message;291 };292 293 286 /// called when a system event is emitted by system queue 294 287 void BaseCommunication::handleSystemEvent(const SystemEvent& event) { … … 297 290 if ( event.getType() == MessageDispatchEvent ){ 298 291 logging_debug( "Forwarding message receiver" ); 299 DispatchMsg* dmsg = event.getData<DispatchMsg>(); 300 Message* msg = dmsg->message; 301 receiveMessage(msg, dmsg->local, dmsg->remote); 302 msg->dropPayload(); 303 delete dmsg->local; 304 delete dmsg->remote; 305 delete msg; 306 delete dmsg; 307 } 308 } 309 310 /// called when a message is received from transport_peer 311 void BaseCommunication::receive_message(transport_protocol* transport, 312 const address_vf local, const address_vf remote, const uint8_t* data, 313 size_t size) { 314 315 // logging_debug( "Dispatching message" ); 316 317 // convert data 318 Data data_( const_cast<uint8_t*>(data), size * 8 ); 319 DispatchMsg* dmsg = new DispatchMsg(); 320 321 Message* msg = new Message(data_); 322 dmsg->local = local->clone(); 323 dmsg->remote = remote->clone(); 324 dmsg->message = msg; 325 326 SystemQueue::instance().scheduleEvent( 327 SystemEvent( this, MessageDispatchEvent, dmsg ) 328 ); 329 } 330 331 /// handles a message from the underlay transport 332 void BaseCommunication::receiveMessage(const Message* message, 333 const address_v* local, const address_v* remote ){ 334 292 boost::function0<void>* handler = event.getData< boost::function0<void> >(); 293 (*handler)(); 294 delete handler; 295 } 296 } 297 298 /** 299 * called within the ASIO thread 300 * when a message is received from underlay transport 301 */ 302 void BaseCommunication::receive_message(transport_connection::sptr connection, 303 reboost::message_t msg) { 304 305 logging_debug( "Dispatching message" ); 306 307 boost::function0<void>* handler = new boost::function0<void>( 308 boost::bind( 309 &BaseCommunication::receiveMessage, 310 this, 311 connection, 312 msg) 313 ); 314 315 SystemQueue::instance().scheduleEvent( 316 SystemEvent(this, MessageDispatchEvent, handler) 317 ); 318 } 319 320 /** 321 * called within the ARIBA thread (System Queue) 322 * when a message is received from underlay transport 323 */ 324 void BaseCommunication::receiveMessage(transport_connection::sptr connection, 325 reboost::message_t message) 326 { 327 328 //// Adapt to old message system //// 329 // Copy data 330 size_t bytes_len = message.size(); 331 uint8_t* bytes = new uint8_t[bytes_len]; 332 message.read(bytes, 0, bytes_len); 333 334 Data data(bytes, bytes_len * 8); 335 336 Message legacy_message; 337 legacy_message.setPayload(data); 338 339 340 335 341 /// decapsulate message 336 AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>();342 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>(); 337 343 logging_debug( "Receiving message of type " << msg->getTypeString() ); 338 344 … … 379 385 LinkID localLink = LinkID::create(); 380 386 LinkID remoteLink = msg->getLocalLink(); 381 logging_debug( "local=" << local->to_string() 382 << " remote=" << remote->to_string() 387 logging_debug( 388 "local=" << connection->getLocalEndpoint()->to_string() 389 << " remote=" << connection->getRemoteEndpoint()->to_string() 383 390 ); 384 391 … … 386 393 bool allowlink = true; 387 394 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 388 allowlink &= i->onLinkRequest( localLink, local, remote ); 395 allowlink &= i->onLinkRequest( localLink, 396 connection->getLocalEndpoint(), 397 connection->getRemoteEndpoint()); 389 398 } 390 399 … … 400 409 ld->localLink = localLink; 401 410 ld->remoteLink = remoteLink; 402 ld->localLocator = local->clone(); 403 ld->remoteLocator = remote->clone(); 411 ld->localLocator = connection->getLocalEndpoint()->clone(); 412 ld->remoteLocator = connection->getRemoteEndpoint()->clone(); 413 ld->connection = connection; 404 414 ld->remoteEndpoint = msg->getLocalDescriptor(); 405 415 add_endpoint(ld->remoteLocator); … … 409 419 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 410 420 localDescriptor.getEndpoints().add( 411 local, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 421 connection->getLocalEndpoint(), 422 endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 412 423 413 424 // link is now up-> add it … … 459 470 } 460 471 472 // store the connection 473 ld.connection = connection; 474 461 475 // set remote locator and link id 462 476 ld.remoteLink = msg->getLocalLink(); 463 ld.remoteLocator = remote->clone();477 ld.remoteLocator = connection->getRemoteEndpoint()->clone(); 464 478 ld.remoteEndpoint.getEndpoints().add( 465 479 msg->getLocalDescriptor().getEndpoints(), … … 536 550 // update the remote locator 537 551 const address_v* oldremote = linkDesc.remoteLocator; 538 linkDesc.remoteLocator = remote->clone();552 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone(); 539 553 540 554 // inform the listeners (local link has _not_ changed!) … … 747 761 748 762 /// sends a message to all end-points in the end-point descriptor 749 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) { 750 Data data = data_serialize( message, DEFAULT_V ); 751 transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8); 752 data.release(); 763 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) { 764 Data data = data_serialize(legacy_message, DEFAULT_V); 765 766 //// Adapt to new message system //// 767 // transfer data buffer ownership to the shared_buffer 768 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 769 770 reboost::message_t message; 771 message.push_back(buf); 772 773 transport->send(endpoint.getEndpoints(), message); 753 774 } 754 775 755 776 /// sends a message to the remote locator inside the link descriptor 756 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) {777 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) { 757 778 if (desc.remoteLocator==NULL) return; 758 Data data = data_serialize( message, DEFAULT_V ); 759 transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8); 760 data.release(); 779 780 Data data = data_serialize(legacy_message, DEFAULT_V); 781 782 //// Adapt to new message system //// 783 // transfer data buffer ownership to the shared_buffer 784 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 785 786 reboost::message_t message; 787 message.push_back(buf); 788 789 desc.connection->send(message); 761 790 } 762 791 -
source/ariba/communication/BaseCommunication.h
r9694 r10653 60 60 #include "ariba/utility/addressing/addressing.hpp" 61 61 #include "ariba/utility/transport/transport.hpp" 62 #include "ariba/utility/transport/transport_connection.hpp" 62 63 63 64 // communication … … 190 191 virtual void handleSystemEvent(const SystemEvent& event); 191 192 192 /// called when a message is received form transport_peer 193 virtual void receive_message(transport_protocol* transport, 194 const address_vf local, const address_vf remote, const uint8_t* data, 195 size_t size); 193 /** 194 * called within the ASIO thread 195 * when a message is received from underlay transport 196 */ 197 virtual void receive_message(transport_connection::sptr connection, 198 reboost::message_t msg); 196 199 197 200 protected: 198 201 199 /// handle received message from a transport module 200 void receiveMessage(const Message* message, 201 const address_v* local, const address_v* remote ); 202 /** 203 * called within the ARIBA thread (System Queue) 204 * when a message is received from underlay transport 205 */ 206 void receiveMessage(transport_connection::sptr connection, 207 reboost::message_t msg); 202 208 203 209 /// called when a network interface change happens … … 250 256 /// flag, whether this link is up 251 257 bool up; 258 259 /// connection if link is up 260 transport_connection::sptr connection; 252 261 }; 253 262 -
source/ariba/communication/networkinfo/AddressDiscovery.cpp
r8620 r10653 129 129 ip_address ip = straddr; 130 130 if (ip.is_loopback()) continue; 131 if (ip.is_link_local()) continue;131 // if (ip.is_link_local()) continue; 132 132 address_vf vf = ip; 133 133 endpoints.add( vf ); -
source/ariba/communication/networkinfo/NetworkInformation.h
r3690 r10653 40 40 #define __NETWORK_INFORMATION_H 41 41 42 #include <unistd.h> 42 43 #include <vector> 43 44 #include <string> -
source/ariba/overlay/BaseOverlay.cpp
r10576 r10653 51 51 52 52 #include "ariba/overlay/messages/OverlayMsg.h" 53 #include "ariba/overlay/messages/DHTMessage.h"54 53 #include "ariba/overlay/messages/JoinRequest.h" 55 54 #include "ariba/overlay/messages/JoinReply.h" … … 66 65 #define visualIdBase ariba::utility::ServerVis::NETWORK_ID_BASE_COMMUNICATION 67 66 68 class ValueEntry {69 public:70 ValueEntry( const Data& value ) : ttl(0), last_update(time(NULL)),71 last_change(time(NULL)), value(value.clone()) {72 }73 74 ValueEntry( const ValueEntry& value ) :75 ttl(value.ttl), last_update(value.last_update),76 last_change(value.last_change), value(value.value.clone()) {77 }78 79 ~ValueEntry() {80 value.release();81 }82 83 void refresh() {84 last_update = time(NULL);85 }86 87 void set_value( const Data& value ) {88 this->value.release();89 this->value = value.clone();90 this->last_change = time(NULL);91 this->last_update = time(NULL);92 }93 94 Data get_value() const {95 return value;96 }97 98 uint16_t get_ttl() const {99 return ttl;100 }101 102 void set_ttl( uint16_t ttl ) {103 this->ttl = ttl;104 }105 106 bool is_ttl_elapsed() const {107 // is persistent? yes-> always return false108 if (ttl==0) return false;109 // return true, if ttl is elapsed110 return ( difftime( time(NULL), this->last_update ) > ttl );111 }112 113 private:114 uint16_t ttl;115 time_t last_update;116 time_t last_change;117 Data value;118 };119 120 class DHTEntry {121 public:122 Data key;123 vector<ValueEntry> values;124 125 vector<Data> get_values() {126 vector<Data> vect;127 BOOST_FOREACH( ValueEntry& e, values )128 vect.push_back( e.get_value() );129 return vect;130 }131 132 void erase_expired_entries() {133 for (vector<ValueEntry>::iterator i = values.begin();134 i != values.end(); i++ )135 if (i->is_ttl_elapsed())136 i = values.erase(i)-1;137 }138 };139 140 class DHT {141 public:142 typedef vector<DHTEntry> Entries;143 typedef vector<ValueEntry> Values;144 Entries entries;145 static const bool verbose = false;146 147 static bool equals( const Data& lhs, const Data& rhs ) {148 if (rhs.getLength()!=lhs.getLength()) return false;149 for (size_t i=0; i<lhs.getLength()/8; i++)150 if (lhs.getBuffer()[i] != rhs.getBuffer()[i]) return false;151 return true;152 }153 154 void put( const Data& key, const Data& value, uint16_t ttl = 0 ) {155 cleanup();156 157 // find entry158 for (size_t i=0; i<entries.size(); i++) {159 DHTEntry& entry = entries.at(i);160 161 // check if key is already known162 if ( equals(entry.key, key) ) {163 164 // check if value is already in values list165 for (size_t j=0; j<entry.values.size(); j++) {166 // found value already? yes-> refresh ttl167 if ( equals(entry.values[j].get_value(), value) ) {168 entry.values[j].refresh();169 if (verbose)170 std::cout << "DHT: Republished value. Refreshing value timestamp."171 << std::endl;172 return;173 }174 }175 176 // new value-> add to entry177 if (verbose)178 std::cout << "DHT: Added value to "179 << " key=" << key << " with value=" << value << std::endl;180 entry.values.push_back( ValueEntry( value ) );181 entry.values.back().set_ttl(ttl);182 return;183 }184 }185 186 // key is unknown-> add key value pair187 if (verbose)188 std::cout << "DHT: New key value pair "189 << " key=" << key << " with value=" << value << std::endl;190 191 // add new entry192 entries.push_back( DHTEntry() );193 DHTEntry& entry = entries.back();194 entry.key = key.clone();195 entry.values.push_back( ValueEntry(value) );196 entry.values.back().set_ttl(ttl);197 }198 199 vector<Data> get( const Data& key ) {200 cleanup();201 // find entry202 for (size_t i=0; i<entries.size(); i++) {203 DHTEntry& entry = entries.at(i);204 if ( equals(entry.key,key) )205 return entry.get_values();206 }207 return vector<Data>();208 }209 210 bool remove( const Data& key ) {211 cleanup();212 213 // find entry214 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {215 DHTEntry& entry = *i;216 217 // found? yes-> delete entry218 if ( equals(entry.key, key) ) {219 entries.erase(i);220 return true;221 }222 }223 return false;224 }225 226 bool remove( const Data& key, const Data& value ) {227 cleanup();228 // find entry229 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {230 DHTEntry& entry = *i;231 232 // found? yes-> try to find value233 if ( equals(entry.key, key) ) {234 for (Values::iterator j = entry.values.begin();235 j != entry.values.end(); j++) {236 237 // value found? yes-> delete238 if (equals(j->get_value(), value)) {239 j = entry.values.erase(j)-1;240 return true;241 }242 }243 }244 }245 return false;246 }247 248 void cleanup() {249 for (Entries::iterator i = entries.begin(); i != entries.end(); i++) {250 DHTEntry& entry = *i;251 entry.erase_expired_entries();252 if (entry.values.size()==0)253 i = entries.erase(i)-1;254 }255 }256 };257 67 258 68 // ---------------------------------------------------------------------------- … … 758 568 sideport(&SideportListener::DEFAULT), overlayInterface(NULL), 759 569 counter(0) { 760 initDHT();761 570 } 762 571 763 572 BaseOverlay::~BaseOverlay() { 764 destroyDHT();765 573 } 766 574 … … 1078 886 } 1079 887 888 1080 889 seqnum_t BaseOverlay::sendMessage(const Message* message, 1081 890 const NodeID& node, const ServiceID& service) { … … 1114 923 } 1115 924 925 926 NodeID BaseOverlay::sendMessageCloserToNodeID(const Message* message, 927 const NodeID& address, const ServiceID& service) { 928 929 if ( overlayInterface->isClosestNodeTo(address) ) 930 { 931 return NodeID::UNSPECIFIED; 932 } 933 934 const NodeID& closest_node = overlayInterface->getNextNodeId(address); 935 936 if ( closest_node != NodeID::UNSPECIFIED ) 937 { 938 seqnum_t seqnum = sendMessage(message, closest_node, service); 939 } 940 941 return closest_node; // XXX return seqnum ?? tuple? closest_node via (non const) reference? 942 } 1116 943 // ---------------------------------------------------------------------------- 1117 944 … … 1831 1658 overlayMsg->addRouteRecord(nodeId); 1832 1659 1833 // handle dht messages (do not route)1834 if (overlayMsg->isDHTMessage()) {1835 bool ret = handleDHTMessage(overlayMsg);1836 delete overlayMsg;1837 return ret;1838 }1839 1840 1660 // handle signaling messages (do not route!) 1841 1661 if (overlayMsg->getType()>=OverlayMsg::typeSignalingStart && … … 1856 1676 delete overlayMsg; 1857 1677 return true; 1858 }1859 1860 // handle DHT response messages1861 if (overlayMsg->hasTypeMask( OverlayMsg::maskDHTResponse )) {1862 bool ret = handleDHTMessage(overlayMsg);1863 delete overlayMsg;1864 return ret;1865 1678 } 1866 1679 … … 1964 1777 stabilizeRelays(); 1965 1778 stabilizeLinks(); 1966 stabilizeDHT();1967 1779 updateVisual(); 1968 1780 } … … 2110 1922 // ---------------------------------------------------------------------------- 2111 1923 2112 void BaseOverlay::initDHT() {2113 dht = new DHT();2114 localDHT = new DHT();2115 republishCounter = 0;2116 }2117 2118 void BaseOverlay::destroyDHT() {2119 delete dht;2120 delete localDHT;2121 }2122 2123 /// stabilize DHT state2124 void BaseOverlay::stabilizeDHT() {2125 2126 // do refresh every 2 seconds2127 if (republishCounter < 2) {2128 republishCounter++;2129 return;2130 }2131 republishCounter = 0;2132 2133 // remove old values from DHT2134 BOOST_FOREACH( DHTEntry& entry, dht->entries ) {2135 // erase old entries2136 entry.erase_expired_entries();2137 }2138 2139 // re-publish values-> do not refresh locally stored values2140 BOOST_FOREACH( DHTEntry& entry, localDHT->entries ) {2141 BOOST_FOREACH( ValueEntry& value, entry.values )2142 dhtPut(entry.key, value.get_value(), value.get_ttl(), false, true );2143 }2144 }2145 2146 // handle DHT messages2147 bool BaseOverlay::handleDHTMessage( OverlayMsg* msg ) {2148 2149 // de-capsulate message2150 logging_debug("Received DHT message");2151 DHTMessage* dhtMsg = msg->decapsulate<DHTMessage>();2152 2153 // handle DHT data message2154 if (msg->getType()==OverlayMsg::typeDHTData) {2155 const ServiceID& service = msg->getService();2156 logging_info( "Received DHT data for service " << service.toString() );2157 2158 // delegate data message2159 CommunicationListener* lst = getListener(service);2160 if(lst != NULL) lst->onKeyValue(dhtMsg->getKey(), dhtMsg->getValues() );2161 delete dhtMsg;2162 return true;2163 }2164 2165 // route message to closest node2166 if (!overlayInterface->isClosestNodeTo(dhtMsg->getHashedKey())) {2167 logging_debug("Routing DHT message to closest node "2168 << " from " << msg->getSourceNode()2169 << " to " << dhtMsg->getHashedKey()2170 );2171 dhtSend(msg, dhtMsg->getHashedKey());2172 delete dhtMsg;2173 return true;2174 }2175 2176 // now, we are the closest node...2177 switch (msg->getType()) {2178 2179 // ----------------------------------------------------------------- put ---2180 case OverlayMsg::typeDHTPut: {2181 logging_debug("DHT-Put: Attempt to store values for key "2182 << dhtMsg->getKey());2183 if (dhtMsg->doReplace()) {2184 logging_debug("DHT-Put: Attempt to replace key: remove old values first!");2185 dht->remove(dhtMsg->getKey());2186 }2187 BOOST_FOREACH( Data value, dhtMsg->getValues() ) {2188 logging_debug("DHT-Put: Stored value: " << value );2189 dht->put(dhtMsg->getKey(), value, dhtMsg->getTTL() );2190 }2191 break;2192 }2193 2194 // ----------------------------------------------------------------- get ---2195 case OverlayMsg::typeDHTGet: {2196 logging_info("DHT-Get: key=" << dhtMsg->getKey() );2197 vector<Data> vect = dht->get(dhtMsg->getKey());2198 BOOST_FOREACH(const Data& d, vect)2199 logging_info("DHT-Get: value=" << d);2200 OverlayMsg omsg(*msg);2201 omsg.swapRoles();2202 omsg.setType(OverlayMsg::typeDHTData);2203 DHTMessage dhtmsg(dhtMsg->getKey(), vect);2204 omsg.encapsulate(&dhtmsg);2205 2206 logging_info("Sending DHT response to " << omsg.getDestinationNode());2207 sendMessage(&omsg, omsg.getDestinationNode());2208 break;2209 }2210 2211 // -------------------------------------------------------------- remove ---2212 case OverlayMsg::typeDHTRemove: {2213 if (dhtMsg->hasValues()) {2214 BOOST_FOREACH( Data value, dhtMsg->getValues() )2215 dht->remove(dhtMsg->getKey(), value );2216 } else2217 dht->remove( dhtMsg->getKey() );2218 break;2219 }2220 2221 // -------------------------------------------------------------- default---2222 default:2223 logging_error("DHT Message type unknown.");2224 return false;2225 }2226 delete dhtMsg;2227 return true;2228 }2229 2230 /// put a value to the DHT with a ttl given in seconds2231 void BaseOverlay::dhtPut( const Data& key, const Data& value, int ttl, bool replace, bool no_local_refresh ) {2232 2233 // log2234 logging_info("DHT-Put:"2235 << " key=" << key << " value=" << value2236 << " ttl=" << ttl << " replace=" << replace2237 );2238 2239 if (!no_local_refresh) {2240 2241 // put into local data store (for refreshes)2242 if (replace) localDHT->remove(key);2243 localDHT->put(key, value, ttl);2244 }2245 2246 DHTMessage dhtmsg( key, value );2247 dhtmsg.setReplace( replace );2248 dhtmsg.setTTL(ttl);2249 2250 OverlayMsg msg( OverlayMsg::typeDHTPut );2251 msg.encapsulate( &dhtmsg );2252 msg.setSourceNode(this->nodeId);2253 dhtSend(&msg, dhtmsg.getHashedKey());2254 }2255 2256 /// removes a key value pair from the DHT2257 void BaseOverlay::dhtRemove( const Data& key, const Data& value ) {2258 // remove from local data store2259 localDHT->remove(key,value);2260 2261 DHTMessage dhtmsg(key,value);2262 2263 // send message2264 OverlayMsg msg(OverlayMsg::typeDHTRemove);2265 msg.encapsulate( &dhtmsg );2266 msg.setSourceNode(this->nodeId);2267 dhtSend(&msg, dhtmsg.getHashedKey());2268 }2269 2270 /// removes all data stored at the given key2271 void BaseOverlay::dhtRemove( const Data& key ) {2272 // log: remove key2273 logging_info("DHT-Remove: Removing key=" << key );2274 2275 DHTMessage dhtmsg(key);2276 2277 // send message2278 OverlayMsg msg(OverlayMsg::typeDHTRemove);2279 msg.encapsulate( &dhtmsg );2280 msg.setSourceNode(this->nodeId);2281 dhtSend(&msg, dhtmsg.getHashedKey());2282 }2283 2284 /// requests data stored using key2285 void BaseOverlay::dhtGet( const Data& key, const ServiceID& service ) {2286 // log: get2287 logging_info("DHT-Get: Trying to resolve key=" <<2288 key << " for service=" << service.toString() );2289 2290 DHTMessage dhtmsg(key);2291 2292 // send message2293 OverlayMsg msg(OverlayMsg::typeDHTGet);2294 msg.setService(service);2295 msg.encapsulate( &dhtmsg );2296 msg.setSourceNode(this->nodeId);2297 dhtSend(&msg, dhtmsg.getHashedKey());2298 }2299 2300 void BaseOverlay::dhtSend( OverlayMsg* msg, const NodeID& dest ) {2301 // log: dht send2302 logging_info("DHT-Send: Sending message with key=" << dest.toString() );2303 2304 // local storage? yes-> put into DHT directly2305 if (overlayInterface->isClosestNodeTo(dest)) {2306 // be compatible with old code so set destination to hashed key2307 msg->setDestinationNode(dest);2308 2309 Data d = data_serialize(msg);2310 Message m2(d);2311 OverlayMsg* m3 = m2.decapsulate<OverlayMsg>();2312 2313 handleDHTMessage(m3);2314 2315 delete m3;2316 return;2317 } else {2318 // need to route2319 NodeID next_hop = overlayInterface->getNextNodeId(dest);2320 msg->setDestinationNode(next_hop);2321 logging_info("DHT-Send: sending via node " << next_hop.toString());2322 2323 send(msg, next_hop);2324 2325 return;2326 }2327 }2328 2329 1924 std::string BaseOverlay::debugInformation() { 2330 1925 std::stringstream s; -
source/ariba/overlay/BaseOverlay.h
r7532 r10653 74 74 using std::vector; 75 75 using std::list; 76 using std::cout;77 76 using std::map; 78 77 using std::make_pair; … … 187 186 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 188 187 188 /** 189 * send a message to the closest directly known node to an address 190 * 191 * @return NodeID of the (closest) destination node; 192 */ 193 NodeID sendMessageCloserToNodeID(const Message* message, const NodeID& address, 194 const ServiceID& service = OverlayInterface::OVERLAY_SERVICE_ID); 195 189 196 /** 190 197 * Send out a message to all nodes that are known in the overlay structure. … … 287 294 */ 288 295 void leaveSpoVNet(); 289 290 /// put a value to the DHT with a ttl given in seconds291 void dhtPut( const Data& key, const Data& value, int ttl = 0, bool replace = false, bool no_local_refresh = false);292 293 /// removes a key value pair from the DHT294 void dhtRemove( const Data& key, const Data& value );295 296 /// removes all data stored at the given key297 void dhtRemove( const Data& key );298 299 /// requests data stored using key300 void dhtGet( const Data& key, const ServiceID& service );301 296 302 297 protected: … … 411 406 bool handleJoinRequest( OverlayMsg* msg, const LinkID& bcLink ); 412 407 bool handleJoinReply( OverlayMsg* msg, const LinkID& bcLink ); 413 414 // handle DHT messages415 bool handleDHTMessage( OverlayMsg* msg );416 408 417 409 // handle link messages … … 506 498 bool ignore_down = false ); 507 499 508 // distributed hashtable handling ------------------------------------------509 510 DHT* dht;511 DHT* localDHT;512 int republishCounter;513 514 void initDHT();515 void destroyDHT();516 void stabilizeDHT();517 void dhtSend( OverlayMsg* msg, const NodeID& dest );518 519 500 // misc -------------------------------------------------------------------- 520 501 -
source/ariba/overlay/messages/OverlayMsg.h
r6919 r10653 89 89 typeLinkAlive = 0x35, ///< keep-alive message 90 90 91 // DHT routed messages 91 /// DHT routed messages 92 /// @deprecated because the DHT has been moved into a separate service 92 93 maskDHT = 0x40, ///< bit mask for dht messages 93 94 typeDHTPut = 0x41, ///< DHT put operation … … 96 97 97 98 /// DHT response messages 99 /// @deprecated because the DHT has been moved into a separate service 98 100 maskDHTResponse = 0x50, ///< bit mask for dht responses 99 101 typeDHTData = 0x51, ///< DHT get data … … 197 199 } 198 200 199 bool isDHTMessage() const {200 return hasTypeMask(maskDHT);201 }202 203 201 /// number of hops and time to live ---------------------------------------- 204 202 -
source/ariba/utility/addressing
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/addressing/endpoint_set.hpp
r6919 r10653 149 149 std::string sub = str.substr(pos, min(nend2,nend1)-pos); 150 150 trim(sub); 151 // cout << sub << endl;152 151 V obj( sub ); 153 152 set.insert(obj); -
source/ariba/utility/bootstrap/modules/bluetoothsdp
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/bootstrap/modules/periodicbroadcast
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/bootstrap/modules/periodicbroadcast/PeriodicBroadcast.h
r7532 r10653 55 55 #include "PeriodicBroadcastMessage.h" 56 56 57 //link-local 58 #include "ariba/utility/transport/tcpip/tcpip.hpp" 59 57 60 using std::map; 58 61 using std::string; 59 using std::cout;60 62 using boost::asio::ip::udp; 61 63 … … 296 298 { 297 299 udp::endpoint endp(udp::v6(), PeriodicBroadcast::serverport_v6); 298 endp.address( boost::asio::ip::address_v6::from_string("ff02::1") ); 299 socket_v6.send_to( boost::asio::buffer(pnt, len), endp, 0, err ); 300 if(err) logging_warn("failed sending message through ipv6 socket"); 300 boost::asio::ip::address_v6 all_nodes = boost::asio::ip::address_v6::from_string("ff02::1"); 301 302 // include all link-local interfaces 303 vector<uint64_t> scope_ids = ariba::transport::tcpip::get_interface_scope_ids(); 304 305 BOOST_FOREACH ( uint64_t id, scope_ids ) 306 { 307 all_nodes.scope_id(id); 308 endp.address( all_nodes ); 309 310 socket_v6.send_to( boost::asio::buffer(pnt, len), endp, 0, err ); 311 if(err) logging_warn("failed sending message through ipv6 socket"); 312 } 301 313 } 302 314 } -
source/ariba/utility/misc/Helper.h
r9770 r10653 67 67 using std::setfill; 68 68 using std::setw; 69 using std::cout;70 69 using std::string; 71 70 using std::ostream; -
source/ariba/utility/serialization/Data.hpp
r9695 r10653 41 41 42 42 //== library includes == 43 #include <string.h> 43 44 #include <stdlib.h> 44 45 #include <iostream> -
source/ariba/utility/system/Timer.cpp
r7821 r10653 104 104 105 105 void Timer::eventFunction() { 106 //std::cout << "unimplemented eventFunction Timer(" << millis << ")" << std::endl;106 logging_warn("unimplemented eventFunction Timer(" << millis << ")"); 107 107 } 108 108 -
source/ariba/utility/transport
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/transport/tcpip/tcpip.cpp
r10075 r10653 1 1 #include "tcpip.hpp" 2 2 3 #define _NO_LOGGING 4 5 // std includes 6 #include <unistd.h> 7 #include <iostream> 8 #include <string> 9 #include <sstream> 10 #include <boost/foreach.hpp> 11 12 // protlib includes 13 #include "protlib/network_message.h" 14 #include "protlib/tp_over_tcp.h" 15 #include "protlib/tperror.h" 16 #include "protlib/logfile.h" 17 #include "protlib/queuemanager.h" 18 #include "protlib/threadsafe_db.h" 19 #include "protlib/setuid.h" 20 21 // protlib namespaces 22 using namespace protlib; 23 using namespace protlib::log; 24 25 logfile commonlog; 26 protlib::log::logfile& protlib::log::DefaultLog(commonlog); 3 #include <boost/array.hpp> 4 5 // interface discovery for link-local destinations 6 #include <ifaddrs.h> 27 7 28 8 namespace ariba { 29 9 namespace transport { 30 10 11 use_logging_cpp(tcpip) 12 31 13 using namespace ariba::addressing; 32 14 33 34 tcpip_endpoint convert( const appladdress* addr ) { 35 const char* ip_str = addr->get_ip_str(); 36 tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() ); 37 return endpoint; 38 } 39 40 appladdress convert( const tcpip_endpoint& endpoint ) { 41 tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint); 42 appladdress 43 peer(e->address().to_string().c_str(), "tcp", e->port().asio() ); 44 // cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl; 45 return peer; 46 } 47 48 tcpip::tcpip( uint16_t port ) : 49 done ( false ), 50 running ( false ), 51 port( port ), 52 tpreceivethread ( NULL ), 53 tpthread ( NULL ), 54 listener ( NULL ) { 55 } 56 57 tcpip::~tcpip() { 58 if (running) stop(); 59 } 60 61 bool get_message_length( NetMsg& m, uint32& clen_bytes ) { 62 clen_bytes = m.decode32(); 63 m.set_pos_r(-4); 64 return true; 65 } 66 67 void tcpip::start() { 68 done = false; 69 running = false; 70 71 // initalize netdb and setuid 72 protlib::tsdb::init(); 73 protlib::setuid::init(); 74 75 // set tcp parameters 76 port_t port = this->port; // port 77 TPoverTCPParam tppar(4, get_message_length, port); 78 79 // create receiver thread 80 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true); 81 QueueManager::instance()->register_queue(tpchecker_fq, 82 message::qaddr_signaling); 83 84 // start thread 85 pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this ); 86 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar ); 87 tpthread->start_processing(); 88 } 89 90 void tcpip::stop() { 91 // stop receiver thread 92 done = true; 93 94 // stop TPoverTCP 95 tpthread->stop_processing(); 96 tpthread->abort_processing(true); 97 tpthread->wait_until_stopped(); 98 99 // unregister TPoverTCP 100 delete QueueManager::instance()->get_queue( message::qaddr_signaling ); 101 QueueManager::instance()->unregister_queue( message::qaddr_signaling ); 102 103 // destroy QueueManager 104 QueueManager::clear(); 105 106 // de-initalize netdb and setuid 107 protlib::setuid::end(); 108 protlib::tsdb::end(); 109 110 // wait for thread to finish and delete 111 pthread_join(tpreceivethread, NULL); 112 } 113 114 void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) { 115 116 // prepare netmsg with length and and port 117 NetMsg* datamsg = new NetMsg(size + 6); 118 datamsg->encode32( size + 2, true ); 119 datamsg->encode16( this->port,true ); 120 121 for (size_t i=0; i<size; i++) 122 datamsg->encode8( data[i],true ); 123 124 // send message 125 tcpip_endpoint endpoint = *remote; 126 appladdress peer = convert(endpoint); 127 128 // add to output queue 129 tpthread->get_thread_object()->send( datamsg, peer, false ); 130 } 131 132 void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) { 133 // send a message to each combination of ip-address and port 134 BOOST_FOREACH( const ip_address ip, endpoints.ip ) { 135 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) { 136 tcpip_endpoint endpoint(ip,port); 137 address_vf vf = endpoint; 138 send(vf,data,size); 139 } 140 } 141 } 142 143 void tcpip::terminate( const address_v* remote) { 144 tcpip_endpoint endpoint = *remote; 145 appladdress peer = convert(endpoint); 146 peer.convert_to_ipv6(); 147 tpthread->get_thread_object()->terminate( peer ); 148 } 149 150 void tcpip::register_listener( transport_listener* listener ) { 151 this->listener = listener; 152 } 153 154 void* tcpip::receiverThread( void* ptp ) { 155 // get reference to transport object 156 tcpip& tp = *((tcpip*)ptp); 157 158 // get queue 159 FastQueue* fq = 160 QueueManager::instance()->get_queue(message::qaddr_signaling); 161 162 // main processing loop 163 tp.running = true; 164 while (!tp.done) { 165 166 // wait for new message to approach 167 message* msg = fq->dequeue_timedwait(300); 168 169 // message has arrived? no-> continue 170 if (!msg) continue; 171 172 // handle transport message 173 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg); 174 if (!tpmsg) { 175 delete msg; 176 continue; 177 } 178 179 // get address & message 180 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() ); 181 const appladdress* local_peer = static_cast<const appladdress*>( tpmsg->get_ownaddress() ); 182 NetMsg* datamsg = tpmsg->get_message(); 183 184 // not a data message? -> continue! 185 if (!datamsg) { 186 delete tpmsg; 187 continue; 188 } 189 190 // get length and remote endpoint port 191 datamsg->set_pos(0); 192 uint32_t message_size = datamsg->decode32(true)-2; 193 //uint16_t remote_port = datamsg->decode16(true); 194 195 // inform listener 196 if (tp.listener != NULL) { 197 tcpip_endpoint remote = convert(remote_peer); 198 tcpip_endpoint local = convert(local_peer); 199 tp.listener->receive_message( 200 &tp, local, remote, datamsg->get_buffer()+6, message_size ); 201 } 202 203 tpmsg->set_message(NULL); 204 delete datamsg; 205 delete tpmsg; 206 } 207 // clean queue & stop 208 fq->cleanup(); 209 tp.running = false; 210 return NULL; 15 typedef boost::mutex::scoped_lock unique_lock; 16 17 tcpip::tcpip( const tcp::endpoint& endp ) : 18 listener(NULL), 19 acceptor(u_io_service.get_asio_io_service(), endp) 20 { 21 } 22 23 tcpip::~tcpip(){} 24 25 void tcpip::start() 26 { 27 // open server socket 28 accept(); 29 30 u_io_service.start(); 31 } 32 33 34 void tcpip::stop() 35 { 36 acceptor.close(); 37 38 u_io_service.stop(); 39 } 40 41 42 /* see header file for comments */ 43 void tcpip::send( 44 const tcp::endpoint& dest_addr, 45 reboost::message_t message, 46 uint8_t priority) 47 { 48 ConnPtr conn; 49 bool need_to_connect = false; 50 51 { 52 unique_lock lock(connections_lock); 53 54 ConnectionMap::iterator it = connections.find(dest_addr); 55 if (it == connections.end()) 56 { 57 ConnPtr tmp_ptr( 58 new tcpip_connection( 59 u_io_service.get_asio_io_service(), 60 shared_from_this() ) 61 ); 62 conn = tmp_ptr; 63 64 conn->partner = dest_addr; 65 conn->remote = convert_address(dest_addr); 66 67 // Note: starting the send is the obligation of the connect_handler 68 // (avoids trying to send while not connected yet) 69 conn->sending = true; 70 need_to_connect = true; 71 72 ConnectionMap::value_type item(dest_addr, conn); 73 connections.insert(item); 74 75 } else { 76 conn = it->second; 77 } 78 } 79 80 81 // * the actual send * 82 conn->enqueue_for_sending(message, priority); 83 84 // if new connection connect to the other party 85 if ( need_to_connect ) 86 { 87 conn->sock.async_connect( 88 dest_addr, 89 boost::bind( 90 &tcpip_connection::async_connect_handler, 91 conn, 92 boost::asio::placeholders::error)); 93 } 94 } 95 96 97 /* see header file for comments */ 98 void tcpip::send( 99 const address_v* remote, 100 reboost::message_t message, 101 uint8_t priority) 102 { 103 send(convert_address(remote), message, priority); 104 } 105 106 107 /* see header file for comments */ 108 void tcpip::send( 109 const endpoint_set& endpoints, 110 reboost::message_t message, 111 uint8_t priority ) 112 { 113 // network interfaces scope_ids, for link-local connections (lazy initialization) 114 vector<uint64_t> scope_ids; 115 116 // send a message to each combination of address-address and port 117 BOOST_FOREACH( const ip_address address, endpoints.ip ) { 118 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) { 119 tcp::endpoint endp(address.asio(), port.asio()); 120 121 // special treatment for link local addresses 122 // ---> send over all (suitable) interfaces 123 if ( endp.address().is_v6() ) 124 { 125 boost::asio::ip::address_v6 v6_addr = endp.address().to_v6(); 126 127 if ( v6_addr.is_link_local() ) 128 { 129 // initialize scope_ids 130 if ( scope_ids.size() == 0 ) 131 scope_ids = get_interface_scope_ids(); 132 133 BOOST_FOREACH ( uint64_t id, scope_ids ) 134 { 135 v6_addr.scope_id(id); 136 endp.address(v6_addr); 137 138 logging_debug("------> SEND TO (link-local): " << endp); 139 // * send * 140 send(endp, message, priority); 141 } 142 } 143 144 continue; 145 } 146 147 // * send * 148 send(endp, message, priority); 149 } 150 } 151 } 152 153 154 void tcpip::register_listener( transport_listener* listener ) 155 { 156 this->listener = listener; 157 } 158 159 160 void tcpip::terminate( const address_v* remote ) 161 { 162 terminate(convert_address(remote)); 163 } 164 165 void tcpip::terminate( const tcp::endpoint& remote ) 166 { 167 ConnPtr conn; 168 169 // find and forget connection 170 { 171 unique_lock lock(connections_lock); 172 173 ConnectionMap::iterator it = connections.find(remote); 174 if (it == connections.end()) 175 { 176 return; 177 } 178 179 conn = it->second; 180 181 connections.erase(it); 182 } 183 184 // close connection 185 boost::system::error_code ec; 186 conn->sock.shutdown(tcp::socket::shutdown_both, ec); 187 conn->sock.close(ec); 188 } 189 190 191 /* private */ 192 void tcpip::accept() 193 { 194 // create new connection object 195 ConnPtr conn( 196 new tcpip_connection( 197 u_io_service.get_asio_io_service(), 198 shared_from_this() 199 ) 200 ); 201 202 // wait for incoming connection 203 acceptor.async_accept( 204 conn->sock, 205 boost::bind(&self::async_accept_handler, 206 this->shared_from_this(), 207 conn, 208 boost::asio::placeholders::error) 209 ); 210 } 211 212 void tcpip::async_accept_handler(ConnPtr conn, const error_code& error) 213 { 214 if ( ! error ) 215 { 216 conn->partner = conn->sock.remote_endpoint(); 217 conn->remote = convert_address(conn->partner); 218 conn->local = convert_address(conn->sock.local_endpoint()); 219 220 { 221 unique_lock lock(connections_lock); 222 223 ConnectionMap::value_type item(conn->sock.remote_endpoint(), conn); 224 connections.insert(item); 225 } 226 227 // read 228 conn->listen(); 229 } 230 231 // accept further connections 232 accept(); 233 } 234 235 inline tcp::endpoint tcpip::convert_address( const address_v* address ) 236 { 237 tcpip_endpoint endpoint = *address; 238 239 return tcp::endpoint( 240 endpoint.address().asio(), endpoint.port().value() 241 ); 242 } 243 244 245 inline tcpip_endpoint tcpip::convert_address(const tcp::endpoint& endpoint) 246 { 247 ip_address address; 248 address.asio(endpoint.address()); 249 tcp_port_address port; 250 port.value(endpoint.port()); 251 return tcpip_endpoint(address, port); 252 } 253 254 255 vector<uint64_t> tcpip::get_interface_scope_ids() 256 { 257 vector<uint64_t> ret; 258 259 struct ifaddrs* ifaceBuffer = NULL; 260 void* tmpAddrPtr = NULL; 261 262 int ok = getifaddrs( &ifaceBuffer ); 263 if( ok != 0 ) return ret; 264 265 for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) { 266 267 // ignore devices that are disabled or have no ip 268 if(i == NULL) continue; 269 struct sockaddr* addr = i->ifa_addr; 270 if (addr==NULL) continue; 271 272 // only use ethX and wlanX devices 273 string device = string(i->ifa_name); 274 if ( (device.find("eth") == string::npos) && 275 (device.find("wlan") == string::npos) /* && 276 (device.find("lo") == string::npos) XXX */ ) 277 { 278 continue; 279 } 280 281 // only use interfaces with ipv6 link-local addresses 282 if (addr->sa_family == AF_INET6) 283 { 284 // convert address 285 // TODO should be possible without detour over strings 286 char straddr[INET6_ADDRSTRLEN]; 287 tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr; 288 inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) ); 289 290 address_v6 v6addr = address_v6::from_string(straddr); 291 if ( v6addr.is_link_local() ) 292 { 293 // * append the scope_id to the return vector * 294 ret.push_back(if_nametoindex(i->ifa_name)); 295 } 296 297 } 298 } 299 300 freeifaddrs(ifaceBuffer); 301 302 return ret; 303 } 304 305 306 /***************** 307 ** inner class ** 308 *****************/ 309 310 tcpip::tcpip_connection::tcpip_connection(boost::asio::io_service & io_service, TcpIpPtr parent) : 311 sock(io_service), 312 valid(true), 313 parent(parent), 314 out_queues(8), //TODO How much priorities shall we have? 315 sending(false) 316 { 317 header.length = 0; 318 header.prot = 0; 319 } 320 321 /*------------------------------------------- 322 | implement transport_connection interface | 323 -------------------------------------------*/ 324 void tcpip::tcpip_connection::send( 325 reboost::message_t message, 326 uint8_t priority) 327 { 328 enqueue_for_sending(message, priority); 329 } 330 331 332 address_vf tcpip::tcpip_connection::getLocalEndpoint() 333 { 334 return local; 335 } 336 337 338 address_vf tcpip::tcpip_connection::getRemoteEndpoint() 339 { 340 return remote; 341 } 342 343 344 void tcpip::tcpip_connection::terminate() 345 { 346 parent->terminate(partner); 347 } 348 349 350 /*------------------------------ 351 | things we defined ourselves | 352 ------------------------------*/ 353 void tcpip::tcpip_connection::async_connect_handler(const error_code& error) 354 { 355 if (error) 356 { 357 parent->terminate(partner); 358 359 return; 360 } 361 362 // save address in ariba format 363 local = parent->convert_address(sock.local_endpoint()); 364 365 // Note: sending has to be true at this point 366 send_next_package(); 367 368 listen(); 369 } 370 371 372 void tcpip::tcpip_connection::listen() 373 { 374 boost::asio::async_read( 375 this->sock, 376 boost::asio::mutable_buffers_1(&this->header, sizeof(header_t)), 377 boost::bind( 378 &tcpip::tcpip_connection::async_read_header_handler, 379 this->shared_from_this(), 380 boost::asio::placeholders::error, 381 boost::asio::placeholders::bytes_transferred 382 ) 383 ); 384 } 385 386 387 void tcpip::tcpip_connection::async_read_header_handler(const error_code& error, size_t bytes_transferred) 388 { 389 if (error) 390 { 391 parent->terminate(partner); 392 393 return; 394 } 395 396 // convert byte order 397 header.length = ntohl(header.length); 398 header.length -= 2; // XXX protlib 399 400 assert(header.length > 0); 401 402 // new buffer for the new packet 403 buffy = shared_buffer_t(header.length); 404 405 // * read data * 406 boost::asio::async_read( 407 this->sock, 408 boost::asio::buffer(buffy.mutable_data(), buffy.size()), 409 boost::bind( 410 &tcpip::tcpip_connection::async_read_data_handler, 411 this->shared_from_this(), 412 boost::asio::placeholders::error, 413 boost::asio::placeholders::bytes_transferred 414 ) 415 ); 416 } 417 418 void tcpip::tcpip_connection::async_read_data_handler( 419 const error_code& error, size_t bytes_transferred) 420 { 421 if (error) 422 { 423 parent->terminate(partner); 424 425 return; 426 } 427 428 message_t msg; 429 msg.push_back(buffy); 430 buffy = shared_buffer_t(); 431 432 if ( parent->listener ) 433 parent->listener->receive_message(shared_from_this(), msg); 434 435 listen(); 436 } 437 438 /* see header file for comments */ 439 void tcpip::tcpip_connection::async_write_handler(reboost::shared_buffer_t packet, const error_code& error, size_t bytes_transferred) 440 { 441 if ( error ) 442 { 443 // remove this connection 444 parent->terminate(partner); 445 446 return; 447 } 448 449 send_next_package(); 450 } 451 452 453 454 void tcpip::tcpip_connection::enqueue_for_sending(Packet packet, uint8_t priority) 455 { 456 bool restart_sending = false; 457 458 // enqueue packet [locked] 459 { 460 unique_lock(out_queues_lock); 461 462 assert( priority < out_queues.size() ); 463 out_queues[priority].push(packet); 464 465 if ( ! sending ) 466 { 467 restart_sending = true; 468 sending = true; 469 } 470 } 471 472 // if sending was stopped, we have to restart it here 473 if ( restart_sending ) 474 { 475 send_next_package(); 476 } 477 } 478 479 /* see header file for comments */ 480 void tcpip::tcpip_connection::send_next_package() 481 { 482 Packet packet; 483 bool found = false; 484 485 // find packet with highest priority [locked] 486 { 487 unique_lock(out_queues_lock); 488 489 for ( vector<OutQueue>::iterator it = out_queues.begin(); 490 it != out_queues.end(); it++ ) 491 { 492 if ( !it->empty() ) 493 { 494 packet = it->front(); 495 it->pop(); 496 found = true; 497 498 break; 499 } 500 } 501 502 // no packets waiting --> stop sending 503 if ( ! found ) 504 { 505 sending = false; 506 } 507 } 508 509 // * send * 510 if ( found ) 511 { 512 reboost::shared_buffer_t header_buf(sizeof(header_t)); 513 header_t* header = (header_t*)(header_buf.mutable_data()); 514 header->length = htonl(packet.size()+2); // XXX protlib 515 516 packet.push_front(header_buf); 517 518 // "convert" message to asio buffer sequence 519 vector<boost::asio::const_buffer> send_sequence(packet.length()); 520 for ( int i=0; i < packet.length(); i++ ) 521 { 522 shared_buffer_t b = packet.at(i); 523 send_sequence.push_back(boost::asio::buffer(b.data(), b.size())); 524 } 525 526 // * async write * 527 boost::asio::async_write( 528 this->sock, 529 send_sequence, 530 boost::bind( 531 &tcpip::tcpip_connection::async_write_handler, 532 this->shared_from_this(), 533 packet, // makes sure our shared pointer lives long enough ;-) 534 boost::asio::placeholders::error, 535 boost::asio::placeholders::bytes_transferred) 536 ); 537 } 211 538 } 212 539 -
source/ariba/utility/transport/tcpip/tcpip.hpp
r5993 r10653 3 3 4 4 #include "ariba/utility/transport/transport.hpp" 5 #include <pthread.h>6 7 // forward declaration 8 namespace protlib { 9 template<class X, class Y>10 class ThreadStarter; 11 class TPoverTCP; 12 class TPoverTCPParam; 13 } 5 #include "ariba/utility/transport/asio/unique_io_service.h" 6 #include "ariba/utility/transport/transport_connection.hpp" 7 #include "ariba/utility/addressing/tcpip_endpoint.hpp" 8 #include <boost/asio.hpp> 9 #include <boost/shared_ptr.hpp> 10 #include <boost/enable_shared_from_this.hpp> 11 #include <queue> 12 #include "ariba/utility/transport/messages/buffers.hpp" 13 #include "ariba/utility/logging/Logging.h" 14 14 15 15 namespace ariba { 16 16 namespace transport { 17 17 18 using namespace protlib; 18 using namespace std; 19 using ariba::transport::detail::unique_io_service; 20 using ariba::addressing::tcpip_endpoint; 21 using boost::asio::ip::tcp; 22 using boost::asio::ip::address_v6; 23 using boost::system::error_code; 24 using reboost::shared_buffer_t; 25 using reboost::message_t; 19 26 20 /** 21 * TODO: Doc 22 * 23 * @author Sebastian Mies <mies@tm.uka.de> 24 */ 25 class tcpip : public transport_protocol { 27 class tcpip; 28 typedef boost::shared_ptr<tcpip> TcpIpPtr; 29 30 class tcpip : 31 public transport_protocol, 32 public boost::enable_shared_from_this<tcpip> 33 { 34 typedef tcpip self; 35 use_logging_h(tcpip) 36 37 private: 38 class tcpip_connection : 39 public transport_connection, 40 public boost::enable_shared_from_this<tcpip_connection> 41 { 42 public: 43 typedef reboost::message_t Packet; 44 typedef std::queue<Packet> OutQueue; 45 46 struct header_t 47 { 48 uint32_t length; 49 uint16_t prot; // XXX protlib 50 } __attribute__((packed)); 51 52 tcpip_connection(boost::asio::io_service& io_service, TcpIpPtr parent); 53 54 /// Inherited from transport_connection 55 virtual void send(reboost::message_t message, uint8_t priority = 0); 56 virtual address_vf getLocalEndpoint(); 57 virtual address_vf getRemoteEndpoint(); 58 virtual void terminate(); 59 60 void listen(); 61 62 void async_connect_handler(const error_code& error); 63 64 void async_read_header_handler(const error_code& error, size_t bytes_transferred); 65 void async_read_data_handler(const error_code& error, size_t bytes_transferred); 66 67 /* 68 * is called from asio when write operation "returns", 69 * calls private function `send_next_package()` 70 */ 71 void async_write_handler( 72 reboost::shared_buffer_t packet, 73 const error_code& error, 74 size_t bytes_transferred); 75 76 77 void enqueue_for_sending(Packet packet, uint8_t priority); 78 79 private: 80 /* 81 * is called from `send` or `async_write_handler` to begin/keep sending 82 * sends the next message with the highest priority in this connection 83 */ 84 void send_next_package(); 85 86 87 public: 88 tcp::socket sock; 89 bool valid; 90 TcpIpPtr parent; 91 92 tcp::endpoint partner; 93 tcpip_endpoint remote; 94 tcpip_endpoint local; 95 96 vector<OutQueue> out_queues; // to be locked with out_queues_lock 97 boost::mutex out_queues_lock; 98 99 bool sending; // to be locked with out_queues_lock 100 101 header_t header; 102 shared_buffer_t buffy; 103 }; 104 typedef boost::shared_ptr<tcpip_connection> ConnPtr; 105 typedef std::map<tcp::endpoint, ConnPtr> ConnectionMap; 106 26 107 public: 27 tcpip( uint16_t port);108 tcpip( const tcp::endpoint& endp ); 28 109 virtual ~tcpip(); 29 110 virtual void start(); 30 111 virtual void stop(); 31 virtual void send( const address_v* remote, const uint8_t* data, size_t size ); 32 virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ); 112 113 /** 114 * enqueues message for sending 115 * create new connection if necessary 116 * starts sending mechanism (if not already running) 117 */ 118 void send( 119 const tcp::endpoint&, 120 reboost::message_t message, 121 uint8_t priority = 0 ); 122 123 /** 124 * Converts address_v to tcp::endpoint and calls the real send() function 125 */ 126 virtual void send( 127 const address_v* remote, 128 reboost::message_t message, 129 uint8_t priority = 0 ); 130 131 /** 132 * calls send for each destination endpoint in `endpoint_set& endpoints` 133 */ 134 virtual void send( 135 const endpoint_set& endpoints, 136 reboost::message_t message, 137 uint8_t priority = 0 ); 138 33 139 virtual void terminate( const address_v* remote ); 140 virtual void terminate( const tcp::endpoint& remote ); 34 141 virtual void register_listener( transport_listener* listener ); 35 142 143 144 /** 145 * returns a vector of (interesting) network interfaces 146 * 147 * [NOTE: The current implementation returns the scope_ids of 148 * all ethX and wlanX network interfaces, to be used for 149 * connections to link-local ipv6 addresses.] 150 * 151 * TODO move to ariba/communication/networkinfo/AddressDiscovery ?? 152 * 153 */ 154 static vector<uint64_t> get_interface_scope_ids(); 155 36 156 private: 37 volatile bool done, running; 38 uint16_t port; 39 pthread_t tpreceivethread; 40 ThreadStarter<TPoverTCP, TPoverTCPParam>* tpthread; 41 static void* receiverThread( void* ptp ); 157 void accept(); 158 void async_accept_handler(ConnPtr conn, const error_code& error); 159 tcp::endpoint convert_address(const address_v* endpoint); 160 tcpip_endpoint convert_address(const tcp::endpoint& endpoint); 161 162 private: 42 163 transport_listener* listener; 164 unique_io_service u_io_service; 165 tcp::acceptor acceptor; 166 167 ConnectionMap connections; 168 boost::mutex connections_lock; 43 169 }; 44 170 -
source/ariba/utility/transport/transport.hpp
r5284 r10653 8 8 // transport protocol implementations 9 9 #include "tcpip/tcpip.hpp" 10 #include "rfcomm/rfcomm .hpp"10 #include "rfcomm/rfcomm_transport.hpp" 11 11 12 12 // common transport peer using all known protocols -
source/ariba/utility/transport/transport_listener.hpp
r5993 r10653 5 5 6 6 #include "ariba/utility/addressing/addressing.hpp" 7 #include "ariba/utility/transport/messages/buffers.hpp" 8 #include "ariba/utility/transport/transport_connection.hpp" 7 9 8 10 // namespace ariba::transport … … 11 13 12 14 using namespace ariba::addressing; 13 14 class transport_protocol;15 15 16 16 /** … … 21 21 class transport_listener { 22 22 public: 23 /// Allow deleting implementing classes by pointer 24 virtual ~transport_listener() {} 25 23 26 /// called when a message is received 24 27 virtual void receive_message( 25 transport_protocol* transport, 26 const address_vf local, const address_vf remote, 27 const uint8_t* data, size_t size 28 transport_connection::sptr connection, 29 reboost::message_t msg 28 30 ) { 29 31 std::cout << "transport_listener: not implemented" << std::endl; -
source/ariba/utility/transport/transport_peer.cpp
r7834 r10653 3 3 #include "transport_peer.hpp" 4 4 #include "transport.hpp" 5 #include "ariba/utility/logging/Logging.h" 6 #include <boost/asio/ip/tcp.hpp> 7 #include <boost/asio/error.hpp> 8 #include <boost/foreach.hpp> 9 10 #ifdef ECLIPSE_PARSER 11 #define foreach(a, b) for(a : b) 12 #else 13 #define foreach(a, b) BOOST_FOREACH(a, b) 14 #endif 5 15 6 16 // namespace ariba::transport … … 9 19 10 20 using namespace ariba::addressing; 21 using boost::asio::ip::tcp; 22 23 #ifdef HAVE_LIBBLUETOOTH 24 using boost::asio::bluetooth::rfcomm; 25 #endif 26 27 use_logging_cpp(transport_peer); 11 28 12 29 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) { 13 // setup tcp transports 14 tcp = NULL; 15 //cout << "#tcpip_transports = " << local.tcp.size() << endl; 16 if (local.tcp.size()==1) { 17 tcp = new tcpip(local.tcp.begin()->value()); 18 //cout << "Started tcpip_transport on port " << local.tcp.begin()->value() << endl; 19 } 20 30 31 // setup tcp transports 32 foreach(tcp_port_address port, local.tcp) { 33 34 if (local.ip.size() > 0) { 35 foreach(ip_address ip_addr, local.ip) { 36 37 tcp::endpoint endp(ip_addr.asio(), port.asio()); 38 create_service(endp); 39 } 40 } else { 41 tcp::endpoint endp_v6(tcp::v6(), port.asio()); 42 tcp::endpoint endp_v4(tcp::v4(), port.asio()); 43 44 create_service(endp_v6); 45 create_service(endp_v4); 46 } 47 48 } 49 21 50 #ifdef HAVE_LIBBLUETOOTH 22 // setup rfcomm transports 23 rfc = NULL; 24 //cout << "#rfcomm_transports = " << local.rfcomm.size() << endl; 25 if ( local.rfcomm.size() == 1 ) { 26 rfc = new rfcomm( local.rfcomm.begin()->value() ); 27 //cout << "Started rfcomm_transport on port " << local.rfcomm.begin()->value() << endl; 28 } 51 foreach(rfcomm_channel_address channel, local.rfcomm) { 52 if (local.bluetooth.size() > 0) { 53 foreach(mac_address mac, local.bluetooth) { 54 rfcomm::endpoint endp(mac.bluetooth(), channel.value()); 55 create_service(endp); 56 } 57 } else { 58 rfcomm::endpoint endp(channel.value()); 59 create_service(endp); 60 } 61 } 29 62 #endif 30 63 } 31 64 65 void transport_peer::create_service(tcp::endpoint endp) { 66 try { 67 TcpIpPtr tmp_ptr(new tcpip(endp)); 68 tcps.push_back(tmp_ptr); 69 logging_info("Listening on IP/TCP " << endp); 70 71 } catch (boost::system::system_error& e) { 72 if (e.code() == boost::asio::error::address_in_use) { 73 logging_warn("[WARN] Address already in use: " 74 << endp << ". Endpoint will be ignored!"); 75 } else { 76 // Rethrow 77 throw; 78 } 79 } 80 } 81 82 #ifdef HAVE_LIBBLUETOOTH 83 void transport_peer::create_service(rfcomm::endpoint endp) { 84 try { 85 rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp)); 86 rfcomms.push_back(tmp_ptr); 87 logging_info("Listening on bluetooth/RFCOMM " << endp); 88 89 } catch (boost::system::system_error& e) { 90 if (e.code() == boost::asio::error::address_in_use) { 91 logging_warn("[WARN] Address already in use: " 92 << endp << ". Endpoint will be ignored!"); 93 } else { 94 // Rethrow 95 throw; 96 } 97 } 98 } 99 #endif 100 32 101 transport_peer::~transport_peer() { 33 if (tcp !=NULL ) delete tcp;34 #ifdef HAVE_LIBBLUETOOTH35 if (rfc !=NULL ) delete rfc;36 #endif37 102 } 38 103 39 104 void transport_peer::start() { 40 if (tcp!=NULL) tcp->start(); 105 foreach(TcpIpPtr tcp, tcps) { 106 tcp->start(); 107 } 108 41 109 #ifdef HAVE_LIBBLUETOOTH 42 if (rfc!=NULL) rfc->start(); 110 foreach(rfcomm_transport::sptr x, rfcomms) { 111 x->start(); 112 } 43 113 #endif 44 114 } 45 115 46 116 void transport_peer::stop() { 47 if (tcp!=NULL) tcp->stop(); 117 foreach(TcpIpPtr tcp, tcps) { 118 tcp->stop(); 119 } 120 48 121 #ifdef HAVE_LIBBLUETOOTH 49 if (rfc!=NULL) rfc->stop(); 122 foreach(rfcomm_transport::sptr x, rfcomms) { 123 x->stop(); 124 } 50 125 #endif 51 126 } 52 127 53 void transport_peer::send( const address_v* remote, const uint8_t* data, size_t size ) { 54 if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL) { 55 tcp->send(remote,data,size); 56 } else 128 129 void transport_peer::send( 130 const endpoint_set& endpoints, 131 reboost::message_t message, 132 uint8_t priority) 133 { 134 foreach(TcpIpPtr tcp, tcps) { 135 tcp->send(endpoints, message, priority); 136 } 137 57 138 #ifdef HAVE_LIBBLUETOOTH 58 if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL) { 59 rfc->send(remote,data,size); 60 } else 61 #endif 62 cerr << "Could not send message to " << remote->to_string() << endl; 63 } 64 65 void transport_peer::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) { 66 if (tcp!=NULL) tcp->send(endpoints,data,size); 67 #ifdef HAVE_LIBBLUETOOTH 68 if (rfc!=NULL) rfc->send(endpoints,data,size); 139 foreach(rfcomm_transport::sptr x, rfcomms) { 140 x->send(endpoints, message, priority); 141 } 69 142 #endif 70 143 } 71 144 72 145 void transport_peer::terminate( const address_v* remote ) { 73 if (remote->instanceof<tcpip_endpoint>() && tcp!=NULL) 74 tcp->terminate(remote); 146 if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung 147 { 148 foreach(TcpIpPtr tcp, tcps) { 149 tcp->terminate(remote); 150 } 151 } 75 152 #ifdef HAVE_LIBBLUETOOTH 76 if (remote->instanceof<rfcomm_endpoint>() && rfc!=NULL) 77 rfc->terminate(remote); 153 if (remote->instanceof<rfcomm_endpoint>()) { 154 foreach(rfcomm_transport::sptr x, rfcomms) { 155 x->terminate(remote); 156 } 157 } 78 158 #endif 79 159 } 80 160 81 161 void transport_peer::register_listener( transport_listener* listener ) { 82 if (tcp!=NULL) tcp->register_listener(listener); 162 foreach(TcpIpPtr tcp, tcps) { 163 tcp->register_listener(listener); 164 } 165 83 166 #ifdef HAVE_LIBBLUETOOTH 84 if (rfc!=NULL) rfc->register_listener(listener); 167 foreach(rfcomm_transport::sptr x, rfcomms) { 168 x->register_listener(listener); 169 } 85 170 #endif 86 171 } -
source/ariba/utility/transport/transport_peer.hpp
r9324 r10653 5 5 #include "transport_protocol.hpp" 6 6 #include "ariba/utility/addressing/endpoint_set.hpp" 7 #include <boost/shared_ptr.hpp> 8 #include "rfcomm/bluetooth_rfcomm.hpp" 9 7 10 8 11 // namespace ariba::transport … … 13 16 14 17 class tcpip; 18 15 19 #ifdef HAVE_LIBBLUETOOTH 16 class rfcomm ;20 class rfcomm_transport; 17 21 #endif 18 22 … … 30 34 virtual void start(); 31 35 virtual void stop(); 32 virtual void send( const address_v* remote, const uint8_t* data, size_t size ); 33 virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ); 36 37 virtual void send( 38 const endpoint_set& endpoints, 39 reboost::message_t message, 40 uint8_t priority = 0); 41 42 /// @deprecated: Use terminate() from transport_connection instead 34 43 virtual void terminate( const address_v* remote ); 44 35 45 virtual void register_listener( transport_listener* listener ); 36 46 37 47 private: 48 void create_service(tcp::endpoint endp); 49 #ifdef HAVE_LIBBLUETOOTH 50 void create_service(boost::asio::bluetooth::rfcomm::endpoint endp); 51 #endif 52 38 53 endpoint_set& local; 39 tcpip* tcp;54 std::vector< boost::shared_ptr<tcpip> > tcps; 40 55 #ifdef HAVE_LIBBLUETOOTH 41 rfcomm* rfc;56 std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms; 42 57 #endif 43 58 }; -
source/ariba/utility/transport/transport_protocol.hpp
r5993 r10653 3 3 4 4 #include "ariba/utility/addressing/addressing.hpp" 5 #include "transport_listener.hpp" 5 #include "ariba/utility/transport/transport_listener.hpp" 6 #include "ariba/utility/transport/messages/message.hpp" 6 7 7 8 // namespace ariba::transport … … 18 19 class transport_protocol { 19 20 public: 21 /// Allow deleting implementing classes by pointer 22 virtual ~transport_protocol() {} 23 20 24 virtual void start() = 0; 21 25 virtual void stop() = 0; 22 virtual void send( const address_v* remote, const uint8_t* data, size_t size ) = 0; 23 virtual void send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) = 0; 26 27 virtual void send( 28 const endpoint_set& endpoints, 29 reboost::message_t message, 30 uint8_t priority = 0) = 0; 31 32 /// @deprecated: Use terminate() from transport_connection instead 24 33 virtual void terminate( const address_v* remote ) = 0; 34 25 35 virtual void register_listener( transport_listener* listener ) = 0; 26 36 }; -
source/ariba/utility/visual/DddVis.h
r6954 r10653 59 59 using std::pair; 60 60 using std::make_pair; 61 using std::cout;62 61 using std::ostringstream; 63 62 using ariba::utility::NodeID; -
source/ariba/utility/visual/OvlVis.h
r6822 r10653 54 54 using std::pair; 55 55 using std::make_pair; 56 using std::cout;57 56 using std::ostringstream; 58 57 using ariba::utility::KeyMapping;
Note:
See TracChangeset
for help on using the changeset viewer.