source: source/ariba/utility/transport/messages/message.hpp@ 12770

Last change on this file since 12770 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 9.2 KB
Line 
1//-----------------------------------------------------------------------------
2// Part of reboost (http://reboost.org). Released under the
3// BSD 2-clause license (http://www.opensource.org/licenses/bsd-license.php).
4// Copyright 2012, Sebastian Mies <mies@reboost.org> --- All rights reserved.
5//-----------------------------------------------------------------------------
6
7#ifndef REBOOST_MESSAGE_HPP_
8#define REBOOST_MESSAGE_HPP_
9
10#include<boost/thread.hpp>
11#include<boost/shared_ptr.hpp>
12#include<cstring>
13
14#include "shared_buffer.hpp"
15
16namespace reboost {
17
18/// message size type
19//typedef signed char mlength_t; // <--- don't do this!!
20//typedef size_t mlength_t;
21typedef int mlength_t; // signed int seems necessary
22
23/// maximum number of buffers per message (default is 8)
24const mlength_t message_max_buffers = (1L << 3);
25//const mlength_t message_max_buffers = (1L << 4);
26
27//! A Copy-on-Write Message with Shared Buffers.
28/**
29 * A Copy-on-Write Message with Shared Buffers.
30 *
31 * A message holds a limited (defined by <code>message_max_buffers</code>)
32 * number of shared buffers. One can add new buffers and messages in front and
33 * at the end of a message. If the no. of buffers exceed
34 * <code>message_max_buffers</code>, then the two smallest successive buffers
35 * are compacted to one buffer.
36 *
37 * @author Sebastian Mies <mies@reboost.org>
38 */
39class message_t {
40private:
41 // read sub-message
42 struct sub_message {
43 message_t* msg;
44 inline void operator()(shared_buffer_t buf) {
45 msg->push_back(buf);
46 }
47 };
48
49 // read from buffer
50 struct read_buffer {
51 boctet_t* buffer;
52 inline void operator()(buffer_t buf) {
53 memcpy((void*) buffer, (void*) buf.data(), buf.size());
54 buffer += buf.size();
55 }
56 };
57
58 // write to buffer
59 struct write_buffer {
60 const boctet_t* buffer;
61 inline void operator()(buffer_t buf) {
62 memcpy((void*) buf.data(), (void*) buffer, buf.size());
63 buffer += buf.size();
64 }
65 };
66
67public:
68 /// Create a new message
69 inline message_t() :
70 imsg() {
71 }
72
73 /// Copy message
74 inline message_t(const message_t& msg) :
75 imsg(msg.imsg)
76 {
77 if ( imsg )
78 imsg->owner = NULL;
79 }
80
81 /// Linearize message
82 inline operator shared_buffer_t() const {
83 return linearize();
84 }
85
86 /// Assign another message
87 inline message_t& operator=(const message_t& msg) {
88 msg.imsg->owner = NULL;
89 imsg = msg.imsg;
90 return *this;
91 }
92
93 /// Adds a shared buffer of given site at the end
94 inline shared_buffer_t& push_back( bsize_t size ) {
95 shared_buffer_t b(size); push_back(b);
96 return imsg->at(-1);
97 }
98
99 /// Adds a buffer at the end of the message
100 inline void push_back(const shared_buffer_t& buf) {
101 own().push_back(buf);
102 }
103
104 /// Adds a message at the end of the message
105 inline void push_back(const message_t& msg) {
106 own();
107 for (mlength_t i = 0; i < msg.length(); i++)
108 push_back(msg[i]);
109 }
110
111 /// Adds a shared buffer of given size at the front
112 inline shared_buffer_t& push_front( bsize_t size ) {
113 shared_buffer_t b(size); push_front(b);
114 return imsg->at(0);
115 }
116
117 /// Adds a buffer at the front of the messsage
118 inline void push_front(const shared_buffer_t& buf) {
119 own().push_front(buf);
120 }
121
122 /// Adds a message at the end of the message
123 inline void push_front(const message_t& msg) {
124 own();
125 for (mlength_t i = msg.length() - 1; i != 0; i--)
126 push_front(msg[i]);
127 }
128
129 /// Removes a buffer from the end of the message
130 inline shared_buffer_t pop_back() {
131 return own().pop_back();
132 }
133
134 /// Removes a buffer from the beginning of this message.
135 inline shared_buffer_t pop_front() {
136 return own().pop_front();
137 }
138
139 /// Returns the size of the message in bytes (or octets).
140 inline size_t size() const {
141 size_t s = 0;
142 for (mlength_t i = 0; i < length(); i++)
143 s += operator[](i).size();
144 return (s);
145 }
146
147 /// Returns the number of buffers inside this message.
148 inline mlength_t length() const {
149 if ( ! imsg )
150 return 0;
151
152 return (imsg->length);
153 }
154
155 /// Returns the buffer at the given index.
156 inline shared_buffer_t& operator[](mlength_t idx) {
157 return at(idx);
158 }
159
160 /// Returns the buffer at the given index.
161 inline shared_buffer_t& at(mlength_t idx) {
162 return imsg->at(idx);
163 }
164
165 /// Returns the constant buffer at the given index.
166 inline const shared_buffer_t& operator[](mlength_t idx) const {
167 return at(idx);
168 }
169
170 /// Returns the buffer at the given index
171 inline const shared_buffer_t& at(mlength_t idx) const {
172 return imsg->at(idx);
173 }
174
175 /// Iterates over a partial set of buffers.
176 template<typename T>
177 inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
178 T op = work;
179 if (size_ == 0) size_ = size() - index_;
180
181 // get first buffer
182 mlength_t f = 0, pf = 0;
183 for (; (pf + at(f).size()) <= index_ && f < imsg->length;
184 pf += at(f).size(), f++);
185 // get last buffer
186 mlength_t l = f, pl = pf;
187 for (; (pl + at(l).size()) < (index_ + size_) && l < imsg->length;
188 pl += at(l).size(), l++);
189
190 // same buffer? yes-> get sub-buffer
191 if (l == f) op(at(l)(index_ - pf, size_));
192 else { // no-> get sub-buffers :)
193 op(at(f)(index_ - pf));
194 for (mlength_t i = f + 1; i < l; i++) op(at(i));
195 op(at(l)(0, index_ + size_ - pl));
196 }
197 }
198
199 /// Read bytes (gather).
200 inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const {
201 struct read_buffer rb = { mem };
202 msg_foreach(rb, idx, size_);
203 }
204
205 /// write bytes
206 inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) {
207 struct write_buffer wb = { mem };
208 msg_foreach(wb, idx, size_);
209 }
210
211 /// Read an arbitrary, binary object.
212 template<class T>
213 inline T read(size_t index) {
214 T obj;
215 read((boctet_t*) &obj, index, sizeof(T));
216 return obj;
217 }
218
219 /// Write an arbitrary, binary object.
220 template<class T>
221 inline void write(const T& value, size_t index) {
222 write((boctet_t*) &value, index, sizeof(T));
223 }
224
225 /// Calculate a (ELF-like) hash.
226 inline size_t hash() const {
227 size_t h = 0;
228 for (mlength_t i = 0; i < length(); i++)
229 h += at(i).hash() * (i + 1);
230 return h;
231 }
232
233 /// Returns a sub-message.
234 message_t operator()(size_t index, size_t size = 0) const {
235 message_t m;
236 struct sub_message sm = { &m };
237 msg_foreach(sm, index, size);
238 return m;
239 }
240
241 /// Linearizes the complete/partial message into one shared buffer.
242 inline shared_buffer_t linearize(size_t index = 0, size_t size_ = 0) const {
243 shared_buffer_t b(size_ == 0 ? size() : size_);
244 read(b.mutable_data(), index, size_);
245 return b;
246 }
247
248private:
249 class imsg_t {
250 public:
251 volatile message_t* owner;
252 shared_buffer_t buffers[message_max_buffers];
253 mlength_t index, length;
254 public:
255 inline imsg_t() :
256 index(0), length(0) {
257 }
258 inline imsg_t(const imsg_t& imsg) :
259 index(imsg.index), length(imsg.length) {
260 for (mlength_t i = 0; i < length; i++)
261 at(index + i) = imsg.at(index + i);
262 }
263 inline shared_buffer_t& at(mlength_t idx) {
264 if (idx < 0) idx += length;
265 return buffers[(idx + index) & (message_max_buffers - 1)];
266 }
267 inline const shared_buffer_t& at(mlength_t idx) const {
268 if (idx < 0) idx += length;
269 return buffers[(idx + index) & (message_max_buffers - 1)];
270 }
271
272 inline void push_back(const shared_buffer_t& buf) {
273 if (buf.size() == 0) return;
274 if (length == message_max_buffers) compact();
275 at(length) = buf;
276 length++;
277 }
278
279 inline void push_front(const shared_buffer_t& buf) {
280 if (buf.size() == 0) return;
281 if (length == message_max_buffers) compact();
282 index--;
283 length++;
284 at(0) = buf;
285 }
286
287 inline shared_buffer_t pop_back() {
288 shared_buffer_t& buf = at(-1);
289 shared_buffer_t ret = buf;
290 buf.reset();
291 length--;
292 return ret;
293 }
294
295 inline shared_buffer_t pop_front() {
296 shared_buffer_t& buf = at(0);
297 shared_buffer_t ret = buf;
298 buf.reset();
299 length--;
300 index++;
301 return ret;
302 }
303
304 /// compacts the buffers, so one more buffer is available
305 inline void compact() {
306
307 // find compacting candidate
308 bsize_t min_size=~0, min_pos=0;
309 for (mlength_t i=0; i<length; i++) {
310 bsize_t c = at(i).size() + at(i+1).size();
311 if (c < min_size || min_size == ~(bsize_t)0 ) {
312 min_size = c;
313 min_pos = i;
314 }
315 }
316
317 // compact buffers
318 shared_buffer_t nb(min_size);
319 at(min_pos).copy_to( nb, 0 );
320 at(min_pos+1).copy_to( nb, at(min_pos).size() );
321
322 // move buffers and assign new buffer
323 for (mlength_t i=min_pos+1; i<length; i++) at(i) = at(i+1);
324 at(min_pos) = nb;
325
326 length--;
327 }
328 };
329 /// own a new message
330 inline imsg_t& own() {
331 if (imsg.get() != NULL && imsg->owner == this) return *imsg;
332 if (imsg.get() == NULL) imsg = boost::shared_ptr<imsg_t>(new imsg_t());
333 else imsg = boost::shared_ptr<imsg_t>(new imsg_t(*imsg));
334 imsg->owner = this;
335 return *imsg;
336 }
337 boost::shared_ptr<imsg_t> imsg;
338};
339
340inline message_t operator+(const message_t& lhs, const message_t& rhs) {
341 message_t m = lhs;
342 m.push_back(rhs);
343 return m;
344}
345
346inline message_t operator+(const message_t& lhs, const buffer_t& rhs) {
347 message_t m = lhs;
348 m.push_back(rhs);
349 return m;
350}
351
352inline message_t operator+(const shared_buffer_t& lhs, const message_t& rhs) {
353 message_t m = rhs;
354 m.push_front(lhs);
355 return m;
356}
357
358inline message_t operator+(const shared_buffer_t& lhs,
359 const shared_buffer_t& rhs) {
360 message_t m;
361 m.push_back(lhs);
362 m.push_back(rhs);
363 return m;
364}
365
366std::ostream& operator<<(std::ostream&, const message_t);
367
368} /* namespace reboost */
369
370#endif /* REBOOST_MESSAGE_HPP_ */
Note: See TracBrowser for help on using the repository browser.