Changeset 12060 for source/ariba/utility/transport
- Timestamp:
- Jun 19, 2013, 11:05:49 AM (11 years ago)
- Location:
- source/ariba/utility/transport
- Files:
-
- 9 added
- 8 deleted
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/utility/transport
- Property svn:mergeinfo changed (with no actual effect on merging)
-
source/ariba/utility/transport/CMakeLists.txt
r10700 r12060 38 38 39 39 add_headers( 40 test_transport.hpp41 transport_connection.hpp42 transport.hpp43 transport_listener.hpp44 40 transport_peer.cpp 45 41 transport_peer.hpp 46 transport_protocol.hpp47 42 ) 48 43 49 add_subdir_sources(asio messages rfcomm tcpip) 44 add_subdir_sources(asio messages rfcomm StreamTransport) 45 -
source/ariba/utility/transport/messages/message.cpp
r10653 r12060 24 24 os << "message({size=" << m.size() << ",buffers=" << (int) m.length() 25 25 << ",hash=" << m.hash() << "},"; 26 m. foreach(ts);26 m.msg_foreach(ts); 27 27 os << ")"; 28 28 return os; -
source/ariba/utility/transport/messages/message.hpp
r10653 r12060 17 17 18 18 /// message size type 19 typedef signed char mlength_t; 19 //typedef signed char mlength_t; // <--- don't do this!! 20 //typedef size_t mlength_t; 21 typedef int mlength_t; // signed int seems necessary 20 22 21 23 /// maximum number of buffers per message (default is 8) 22 24 const mlength_t message_max_buffers = (1L << 3); 25 //const mlength_t message_max_buffers = (1L << 4); 23 26 24 27 //! A Copy-on-Write Message with Shared Buffers. … … 70 73 /// Copy message 71 74 inline message_t(const message_t& msg) : 72 imsg(msg.imsg) { 73 imsg->owner = NULL; 75 imsg(msg.imsg) 76 { 77 if ( imsg ) 78 imsg->owner = NULL; 74 79 } 75 80 … … 142 147 /// Returns the number of buffers inside this message. 143 148 inline mlength_t length() const { 149 if ( ! imsg ) 150 return 0; 151 144 152 return (imsg->length); 145 153 } … … 167 175 /// Iterates over a partial set of buffers. 168 176 template<typename T> 169 inline void foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {177 inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const { 170 178 T op = work; 171 179 if (size_ == 0) size_ = size() - index_; … … 192 200 inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const { 193 201 struct read_buffer rb = { mem }; 194 foreach(rb, idx, size_);202 msg_foreach(rb, idx, size_); 195 203 } 196 204 … … 198 206 inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) { 199 207 struct write_buffer wb = { mem }; 200 foreach(wb, idx, size_);208 msg_foreach(wb, idx, size_); 201 209 } 202 210 … … 227 235 message_t m; 228 236 struct sub_message sm = { &m }; 229 foreach(sm, index, size);237 msg_foreach(sm, index, size); 230 238 return m; 231 239 } -
source/ariba/utility/transport/messages/shared_buffer.hpp
r10700 r12060 9 9 10 10 #include <cstring> 11 #include <string> 11 12 #include <boost/shared_ptr.hpp> 12 13 … … 18 19 #include "buffer.hpp" 19 20 21 #include <stdexcept> 22 20 23 namespace reboost { 24 25 class illegal_sub_buffer: public std::runtime_error 26 { 27 public: 28 /** Takes a character string describing the error. */ 29 explicit illegal_sub_buffer(const std::string& __arg) : 30 std::runtime_error(__arg) 31 { 32 } 33 34 virtual ~illegal_sub_buffer() throw() {} 35 }; 21 36 22 37 /** … … 104 119 parent(new deleteable_buffer(buffer, size)) 105 120 { 121 } 122 123 // /// XXX debug... copy! 124 // /// create shared buffer from buffer 125 // inline shared_buffer_t(const char* buffer, bsize_t size) : 126 // buffer_t(), parent(new deleteable_buffer(size)) { 106 127 // memcpy(parent->mutable_data(), buffer, parent->size()); 107 // data(parent->mutable_data()); 108 // this->size(parent->size()); 109 } 128 // data(parent->mutable_data()); this->size(parent->size()); 129 // } 110 130 111 131 /// clone data from a normal buffer … … 129 149 130 150 /// return sub-buffer. 131 inline self operator()(bsize_t index, bsize_t size = 0) const { 151 inline self operator()(bsize_t index, bsize_t size = 0) const 152 { 153 // special cases 154 if ( index + size > size_ ) 155 { 156 // empty sub-buffer 157 if ( index == size_ ) 158 { 159 self n; 160 return n; 161 } 162 163 // ERROR: index out of bounds 164 throw illegal_sub_buffer("Index or size out of bounds in shared_buffer"); 165 } 166 167 // regular case 132 168 self n(*this); 133 169 n.data_ += index; -
source/ariba/utility/transport/rfcomm/CMakeLists.txt
r10700 r12060 40 40 bluetooth_endpoint.hpp 41 41 bluetooth_rfcomm.hpp 42 rfcomm_transport.hpp43 42 ) 44 43 45 add_sources(rfcomm_transport.cpp) 44 #add_sources() 45 -
source/ariba/utility/transport/transport_peer.cpp
r10700 r12060 1 2 #include "ariba/config.h"3 1 #include "transport_peer.hpp" 4 #include "transport.hpp" 5 #include <boost/asio/ip/tcp.hpp> 2 3 // ariba 4 #include "StreamTransport/StreamTransport.hpp" 5 #include "ariba/utility/addressing2/tcpip_endpoint.hpp" 6 7 // boost 6 8 #include <boost/asio/error.hpp> 7 9 #include <boost/foreach.hpp> 8 9 #ifdef ECLIPSE_PARSER10 #define foreach(a, b) for(a : b)11 #else12 #define foreach(a, b) BOOST_FOREACH(a, b)13 #endif14 10 15 11 // namespace ariba::transport … … 17 13 namespace transport { 18 14 19 using namespace a riba::addressing;15 using namespace addressing2; 20 16 using boost::asio::ip::tcp; 21 17 … … 26 22 use_logging_cpp(transport_peer); 27 23 28 transport_peer::transport_peer( endpoint_set& local_set ) : local(local_set) { 29 30 // setup tcp transports 31 foreach(tcp_port_address port, local.tcp) { 24 transport_peer::transport_peer() : 25 local(new addressing2::endpoint_set()) 26 { 27 } 28 29 EndpointSetPtr transport_peer::add_listenOn_endpoints(EndpointSetPtr endpoints) 30 { 31 // TCP Endpoints 32 BOOST_FOREACH( shared_ptr<tcpip_endpoint> endp, endpoints->get_tcpip_endpoints() ) 33 { 34 // automatic port detection 35 bool port_detection = false; 36 uint16_t try_port = 41322; 32 37 33 if (local.ip.size() > 0) { 34 foreach(ip_address ip_addr, local.ip) { 35 36 tcp::endpoint endp(ip_addr.asio(), port.asio()); 37 create_service(endp); 38 } 39 } else { 40 tcp::endpoint endp_v6(tcp::v6(), port.asio()); 41 tcp::endpoint endp_v4(tcp::v4(), port.asio()); 42 43 create_service(endp_v6); 44 create_service(endp_v4); 38 tcp::endpoint asio_endp = endp->to_asio(); 39 if ( asio_endp.port() == 0 ) 40 { 41 port_detection = true; 45 42 } 46 43 47 } 48 44 45 // create new server socket 46 do 47 { 48 try 49 { 50 // automatic port detection 51 if ( port_detection ) 52 { 53 asio_endp.port(try_port); 54 endp = tcpip_endpoint::create_TcpIP_Endpoint(asio_endp); 55 } 56 57 TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp->to_asio())); 58 transport_streams.push_back(tmp_ptr); 59 logging_info("Listening on IP/TCP " << endp->to_string()); 60 61 local->add_endpoint(endp); 62 port_detection = false; 63 } 64 65 catch (boost::system::system_error& e) 66 { 67 // address in use 68 if (e.code() == boost::asio::error::address_in_use) 69 { 70 // BRANCH: automatic port detection 71 if ( port_detection ) 72 { 73 // give up ? 74 if ( try_port > 41422 ) 75 { 76 logging_warn("[WARN] Unable to find free port. Giving up. :-( Last try was: " 77 << endp->to_string() << ". Endpoint will be ignored!"); 78 79 port_detection = false; 80 } 81 else 82 { 83 // try next port 84 try_port++; 85 } 86 } 87 // BRANCH: explicit given port --> error 88 else 89 { 90 logging_warn("[WARN] Address already in use: " 91 << endp->to_string() << ". Endpoint will be ignored!"); 92 } 93 } 94 95 // Rethrow 96 else 97 { 98 throw; 99 } 100 } 101 } while ( port_detection ); 102 } 103 104 // TODO Bluetooth Endpoints 49 105 #ifdef HAVE_LIBBLUETOOTH 50 foreach(rfcomm_channel_address channel, local.rfcomm) {51 if (local.bluetooth.size() > 0) {52 foreach(mac_address mac, local.bluetooth) {53 rfcomm::endpoint endp(mac.bluetooth(), channel.value());54 create_service(endp);55 }56 } else {57 rfcomm::endpoint endp(channel.value());58 create_service(endp);59 }60 }106 // foreach(rfcomm_channel_address channel, local.rfcomm) { 107 // if (local.bluetooth.size() > 0) { 108 // foreach(mac_address mac, local.bluetooth) { 109 // rfcomm::endpoint endp(mac.bluetooth(), channel.value()); 110 // create_service(endp); 111 // } 112 // } else { 113 // rfcomm::endpoint endp(channel.value()); 114 // create_service(endp); 115 // } 116 // } 61 117 #endif 62 } 63 64 void transport_peer::create_service(tcp::endpoint endp) { 65 try { 66 TcpIpPtr tmp_ptr(new tcpip(endp)); 67 tcps.push_back(tmp_ptr); 68 logging_info("Listening on IP/TCP " << endp); 69 70 } catch (boost::system::system_error& e) { 71 if (e.code() == boost::asio::error::address_in_use) { 72 logging_warn("[WARN] Address already in use: " 73 << endp << ". Endpoint will be ignored!"); 74 } else { 75 // Rethrow 76 throw; 77 } 78 } 79 } 118 119 return local; 120 } 121 122 //void transport_peer::create_service(tcp::endpoint endp) { 123 // try { 124 // TransportProtocolPtr tmp_ptr(new StreamTransport<tcp>(endp)); 125 // tcps.push_back(tmp_ptr); 126 // logging_info("Listening on IP/TCP " << endp); 127 // 128 // } catch (boost::system::system_error& e) { 129 // if (e.code() == boost::asio::error::address_in_use) { 130 // logging_warn("[WARN] Address already in use: " 131 // << endp << ". Endpoint will be ignored!"); 132 // } else { 133 // // Rethrow 134 // throw; 135 // } 136 // } 137 //} 80 138 81 139 #ifdef HAVE_LIBBLUETOOTH 82 void transport_peer::create_service(rfcomm::endpoint endp) {83 try {84 rfcomm_transport::sptr tmp_ptr(new rfcomm_transport(endp));85 rfcomms.push_back(tmp_ptr);86 logging_info("Listening on bluetooth/RFCOMM " << endp);87 88 } catch (boost::system::system_error& e) {89 if (e.code() == boost::asio::error::address_in_use) {90 logging_warn("[WARN] Address already in use: "91 << endp << ". Endpoint will be ignored!");92 } else {93 // Rethrow94 throw;95 }96 }97 }140 //void transport_peer::create_service(rfcomm::endpoint endp) { 141 // try { 142 // TransportProtocolPtr tmp_ptr(new StreamTransport<rfcomm>(endp)); 143 // rfcomms.push_back(tmp_ptr); 144 // logging_info("Listening on bluetooth/RFCOMM " << endp); 145 // 146 // } catch (boost::system::system_error& e) { 147 // if (e.code() == boost::asio::error::address_in_use) { 148 // logging_warn("[WARN] Address already in use: " 149 // << endp << ". Endpoint will be ignored!"); 150 // } else { 151 // // Rethrow 152 // throw; 153 // } 154 // } 155 //} 98 156 #endif 99 157 … … 101 159 } 102 160 103 void transport_peer::start() { 104 foreach(TcpIpPtr tcp, tcps) { 105 tcp->start(); 106 } 107 108 #ifdef HAVE_LIBBLUETOOTH 109 foreach(rfcomm_transport::sptr x, rfcomms) { 110 x->start(); 111 } 112 #endif 113 } 114 115 void transport_peer::stop() { 116 foreach(TcpIpPtr tcp, tcps) { 117 tcp->stop(); 118 } 119 120 #ifdef HAVE_LIBBLUETOOTH 121 foreach(rfcomm_transport::sptr x, rfcomms) { 122 x->stop(); 123 } 124 #endif 161 void transport_peer::start() 162 { 163 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 164 { 165 stream->start(); 166 } 167 } 168 169 void transport_peer::stop() 170 { 171 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 172 { 173 stream->stop(); 174 } 125 175 } 126 176 127 177 128 178 void transport_peer::send( 129 const endpoint_set&endpoints,179 const const_EndpointSetPtr endpoints, 130 180 reboost::message_t message, 131 181 uint8_t priority) 132 182 { 133 foreach(TcpIpPtr tcp, tcps) { 134 tcp->send(endpoints, message, priority); 135 } 136 137 #ifdef HAVE_LIBBLUETOOTH 138 foreach(rfcomm_transport::sptr x, rfcomms) { 139 x->send(endpoints, message, priority); 140 } 141 #endif 142 } 143 144 void transport_peer::terminate( const address_v* remote ) { 145 if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung 146 { 147 foreach(TcpIpPtr tcp, tcps) { 148 tcp->terminate(remote); 149 } 150 } 151 #ifdef HAVE_LIBBLUETOOTH 152 if (remote->instanceof<rfcomm_endpoint>()) { 153 foreach(rfcomm_transport::sptr x, rfcomms) { 154 x->terminate(remote); 155 } 156 } 157 #endif 158 } 159 160 void transport_peer::register_listener( transport_listener* listener ) { 161 foreach(TcpIpPtr tcp, tcps) { 162 tcp->register_listener(listener); 163 } 164 165 #ifdef HAVE_LIBBLUETOOTH 166 foreach(rfcomm_transport::sptr x, rfcomms) { 167 x->register_listener(listener); 168 } 169 #endif 183 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 184 { 185 stream->send(endpoints, message, priority); 186 } 187 } 188 189 // XXX DEPRECATED 190 //void transport_peer::terminate( const address_v* remote ) { 191 // if (remote->instanceof<tcpip_endpoint>())// TODO direkt auf der richtigen verbindung 192 // { 193 // foreach(TransportProtocolPtr tcp, tcps) { 194 // tcp->terminate(remote); 195 // } 196 // } 197 //#ifdef HAVE_LIBBLUETOOTH 198 // if (remote->instanceof<rfcomm_endpoint>()) { 199 // foreach(TransportProtocolPtr x, rfcomms) { 200 // x->terminate(remote); 201 // } 202 // } 203 //#endif 204 //} 205 206 void transport_peer::register_listener( transport_listener* listener ) 207 { 208 BOOST_FOREACH(TransportProtocolPtr stream, transport_streams) 209 { 210 stream->register_listener(listener); 211 } 170 212 } 171 213 -
source/ariba/utility/transport/transport_peer.hpp
r10700 r12060 2 2 #define TRANSPORT_PEER_HPP_ 3 3 4 // ariba 4 5 #include "ariba/config.h" 5 6 #include "ariba/utility/logging/Logging.h" 6 #include "transport_protocol.hpp" 7 #include "ariba/utility/addressing/endpoint_set.hpp" 7 #include "ariba/utility/addressing2/endpoint_set.hpp" 8 9 // ariba interfaces 10 #include "interfaces/transport_protocol.hpp" 11 12 // boost 8 13 #include <boost/shared_ptr.hpp> 9 #include "rfcomm/bluetooth_rfcomm.hpp" 14 15 // boost-adaption 16 //#include "rfcomm/bluetooth_rfcomm.hpp" 10 17 11 18 … … 14 21 namespace transport { 15 22 16 using namespace ariba::addressing;17 18 class tcpip;19 20 #ifdef HAVE_LIBBLUETOOTH21 class rfcomm_transport;22 #endif23 24 23 /** 25 * TODO: Doc 24 * This class allocates implementations of various transport 25 * protocols and can send messages to an entire set of endpoints 26 26 * 27 * @author Sebastian Mies <mies@tm.uka.de> 27 * @author Sebastian Mies <mies@tm.uka.de>, Mario Hock 28 28 */ 29 /// this transport peer allocates implementations of various transport 30 /// protocols and can send messages to an entire set of endpoints 31 class transport_peer : public transport_protocol{29 class transport_peer : 30 public transport_protocol 31 { 32 32 use_logging_h(transport_peer); 33 typedef boost::shared_ptr<transport_protocol> TransportProtocolPtr; 34 33 35 public: 34 transport_peer( endpoint_set& local_set ); 36 transport_peer(); 37 38 /** 39 * Adds endpoints on which ariba should listen ("server"-sockets) 40 * 41 * @return An endpoint_set holding all active endpoints ariba is listening on. 42 */ 43 addressing2::EndpointSetPtr add_listenOn_endpoints(addressing2::EndpointSetPtr endpoints); 44 35 45 virtual ~transport_peer(); 36 46 virtual void start(); … … 38 48 39 49 virtual void send( 40 const endpoint_set&endpoints,50 const addressing2::const_EndpointSetPtr endpoints, 41 51 reboost::message_t message, 42 uint8_t priority = 0); 43 44 /// @deprecated: Use terminate() from transport_connection instead 45 virtual void terminate( const address_v* remote ); 46 52 uint8_t priority = system_priority::OVERLAY); 53 47 54 virtual void register_listener( transport_listener* listener ); 48 55 56 49 57 private: 50 void create_service(tcp::endpoint endp); 51 #ifdef HAVE_LIBBLUETOOTH 52 void create_service(boost::asio::bluetooth::rfcomm::endpoint endp); 53 #endif 54 55 endpoint_set& local; 56 std::vector< boost::shared_ptr<tcpip> > tcps; 57 #ifdef HAVE_LIBBLUETOOTH 58 std::vector< boost::shared_ptr<rfcomm_transport> > rfcomms; 59 #endif 58 addressing2::EndpointSetPtr local; 59 std::vector<TransportProtocolPtr> transport_streams; 60 60 }; 61 61
Note:
See TracChangeset
for help on using the changeset viewer.