source: source/ariba/utility/transport/messages/message.hpp@ 12773

Last change on this file since 12773 was 12773, checked in by hock@…, 10 years ago

WARNING! This revision is not intended for productive use!

!! DEBUGGING ONLY !!

Extreme debugging of StreamTransport.
"Typo" in the locking-code, was very hard to track down!

This revision is stored since a lot dabug code was written and should not just be deleted.

File size: 10.0 KB
Line 
1//-----------------------------------------------------------------------------
2// Part of reboost (http://reboost.org). Released under the
3// BSD 2-clause license (http://www.opensource.org/licenses/bsd-license.php).
4// Copyright 2012, Sebastian Mies <mies@reboost.org> --- All rights reserved.
5//-----------------------------------------------------------------------------
6
7#ifndef REBOOST_MESSAGE_HPP_
8#define REBOOST_MESSAGE_HPP_
9
10#include<boost/thread.hpp>
11#include<boost/shared_ptr.hpp>
12#include<cstring>
13#include <cassert>
14
15#include "shared_buffer.hpp"
16
17namespace reboost {
18
19/// message size type
20//typedef signed char mlength_t; // <--- don't do this!!
21//typedef size_t mlength_t;
22typedef int mlength_t; // signed int seems necessary
23
24/// maximum number of buffers per message (default is 8)
25const mlength_t message_max_buffers = (1L << 3);
26//const mlength_t message_max_buffers = (1L << 4);
27
28//! A Copy-on-Write Message with Shared Buffers.
29/**
30 * A Copy-on-Write Message with Shared Buffers.
31 *
32 * A message holds a limited (defined by <code>message_max_buffers</code>)
33 * number of shared buffers. One can add new buffers and messages in front and
34 * at the end of a message. If the no. of buffers exceed
35 * <code>message_max_buffers</code>, then the two smallest successive buffers
36 * are compacted to one buffer.
37 *
38 * @author Sebastian Mies <mies@reboost.org>
39 */
40class message_t {
41private:
42 // read sub-message
43 struct sub_message {
44 message_t* msg;
45 inline void operator()(shared_buffer_t buf) {
46 msg->push_back(buf);
47 }
48 };
49
50 // read from buffer
51 struct read_buffer {
52 boctet_t* buffer;
53 inline void operator()(buffer_t buf) {
54 memcpy((void*) buffer, (void*) buf.data(), buf.size());
55 buffer += buf.size();
56 }
57 };
58
59 // write to buffer
60 struct write_buffer {
61 const boctet_t* buffer;
62 inline void operator()(buffer_t buf) {
63 memcpy((void*) buf.data(), (void*) buffer, buf.size());
64 buffer += buf.size();
65 }
66 };
67
68public:
69 /// Create a new message
70 inline message_t() :
71 imsg(),
72 MAGIC_IDENTIFIER("!!message_t-MAGIC_IDENTIFIER!!"), // XXX Mario: Debugging
73 MAGIC_NUMBER(421337) // XXX Mario: Debugging
74 {
75 }
76
77 /// Copy message
78 inline message_t(const message_t& msg) :
79 imsg(msg.imsg),
80 MAGIC_IDENTIFIER(msg.MAGIC_IDENTIFIER), // XXX
81 MAGIC_NUMBER(msg.MAGIC_NUMBER) // XXX
82
83 {
84 assert ( msg.MAGIC_NUMBER == 421337 ); // XXX
85
86 if ( imsg )
87 imsg->owner = NULL;
88 }
89
90 /// Linearize message
91 inline operator shared_buffer_t() const {
92 return linearize();
93 }
94
95 /// Assign another message
96 inline message_t& operator=(const message_t& msg)
97 {
98 assert ( msg.MAGIC_NUMBER == 421337 ); // XXX
99
100 if ( msg.imsg )
101 {
102 msg.imsg->owner = NULL;
103 imsg = msg.imsg;
104 }
105 else
106 {
107 // TODO: is this a valid state? (since it can definitely been reached...)
108 imsg.reset();
109 }
110
111 return *this;
112 }
113
114 /// Adds a shared buffer of given site at the end
115 inline shared_buffer_t& push_back( bsize_t size ) {
116 shared_buffer_t b(size); push_back(b);
117 return imsg->at(-1);
118 }
119
120 /// Adds a buffer at the end of the message
121 inline void push_back(const shared_buffer_t& buf) {
122 own().push_back(buf);
123 }
124
125 /// Adds a message at the end of the message
126 inline void push_back(const message_t& msg) {
127 own();
128 for (mlength_t i = 0; i < msg.length(); i++)
129 push_back(msg[i]);
130 }
131
132 /// Adds a shared buffer of given size at the front
133 inline shared_buffer_t& push_front( bsize_t size ) {
134 shared_buffer_t b(size); push_front(b);
135 return imsg->at(0);
136 }
137
138 /// Adds a buffer at the front of the messsage
139 inline void push_front(const shared_buffer_t& buf) {
140 own().push_front(buf);
141 }
142
143 /// Adds a message at the end of the message
144 inline void push_front(const message_t& msg) {
145 own();
146 for (mlength_t i = msg.length() - 1; i != 0; i--)
147 push_front(msg[i]);
148 }
149
150 /// Removes a buffer from the end of the message
151 inline shared_buffer_t pop_back() {
152 return own().pop_back();
153 }
154
155 /// Removes a buffer from the beginning of this message.
156 inline shared_buffer_t pop_front() {
157 return own().pop_front();
158 }
159
160 /// Returns the size of the message in bytes (or octets).
161 inline size_t size() const {
162 size_t s = 0;
163 for (mlength_t i = 0; i < length(); i++)
164 s += operator[](i).size();
165 return (s);
166 }
167
168 /// Returns the number of buffers inside this message.
169 inline mlength_t length() const {
170 if ( ! imsg )
171 return 0;
172
173 return (imsg->length);
174 }
175
176 /// Returns the buffer at the given index.
177 inline shared_buffer_t& operator[](mlength_t idx) {
178 return at(idx);
179 }
180
181 /// Returns the buffer at the given index.
182 inline shared_buffer_t& at(mlength_t idx) {
183 return imsg->at(idx);
184 }
185
186 /// Returns the constant buffer at the given index.
187 inline const shared_buffer_t& operator[](mlength_t idx) const {
188 return at(idx);
189 }
190
191 /// Returns the buffer at the given index
192 inline const shared_buffer_t& at(mlength_t idx) const {
193 return imsg->at(idx);
194 }
195
196 /// Iterates over a partial set of buffers.
197 template<typename T>
198 inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
199 T op = work;
200 if (size_ == 0) size_ = size() - index_;
201
202 // get first buffer
203 mlength_t f = 0, pf = 0;
204 for (; (pf + at(f).size()) <= index_ && f < imsg->length;
205 pf += at(f).size(), f++);
206 // get last buffer
207 mlength_t l = f, pl = pf;
208 for (; (pl + at(l).size()) < (index_ + size_) && l < imsg->length;
209 pl += at(l).size(), l++);
210
211 // same buffer? yes-> get sub-buffer
212 if (l == f) op(at(l)(index_ - pf, size_));
213 else { // no-> get sub-buffers :)
214 op(at(f)(index_ - pf));
215 for (mlength_t i = f + 1; i < l; i++) op(at(i));
216 op(at(l)(0, index_ + size_ - pl));
217 }
218 }
219
220 /// Read bytes (gather).
221 inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const {
222 struct read_buffer rb = { mem };
223 msg_foreach(rb, idx, size_);
224 }
225
226 /// write bytes
227 inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) {
228 struct write_buffer wb = { mem };
229 msg_foreach(wb, idx, size_);
230 }
231
232 /// Read an arbitrary, binary object.
233 template<class T>
234 inline T read(size_t index) {
235 T obj;
236 read((boctet_t*) &obj, index, sizeof(T));
237 return obj;
238 }
239
240 /// Write an arbitrary, binary object.
241 template<class T>
242 inline void write(const T& value, size_t index) {
243 write((boctet_t*) &value, index, sizeof(T));
244 }
245
246 /// Calculate a (ELF-like) hash.
247 inline size_t hash() const {
248 size_t h = 0;
249 for (mlength_t i = 0; i < length(); i++)
250 h += at(i).hash() * (i + 1);
251 return h;
252 }
253
254 /// Returns a sub-message.
255 message_t operator()(size_t index, size_t size = 0) const {
256 message_t m;
257 struct sub_message sm = { &m };
258 msg_foreach(sm, index, size);
259 return m;
260 }
261
262 /// Linearizes the complete/partial message into one shared buffer.
263 inline shared_buffer_t linearize(size_t index = 0, size_t size_ = 0) const {
264 shared_buffer_t b(size_ == 0 ? size() : size_);
265 read(b.mutable_data(), index, size_);
266 return b;
267 }
268
269private:
270 class imsg_t {
271 public:
272 volatile message_t* owner;
273 shared_buffer_t buffers[message_max_buffers];
274 mlength_t index, length;
275
276 // XXX Mario: Debugging
277 const std::string MAGIC_IDENTIFIER;
278 const int MAGIC_NUMBER;
279 public:
280 inline imsg_t() :
281 owner(NULL),
282 index(0),
283 length(0),
284 MAGIC_IDENTIFIER("!!imsg_t-MAGIC_IDENTIFIER!!"), // XXX Mario: Debugging
285 MAGIC_NUMBER(133742)
286 {
287 }
288 inline imsg_t(const imsg_t& imsg) :
289 index(imsg.index), length(imsg.length),
290 MAGIC_IDENTIFIER(imsg.MAGIC_IDENTIFIER), // XXX
291 MAGIC_NUMBER(imsg.MAGIC_NUMBER) // XXX
292 {
293 for (mlength_t i = 0; i < length; i++)
294 at(index + i) = imsg.at(index + i);
295 }
296 inline shared_buffer_t& at(mlength_t idx) {
297 if (idx < 0) idx += length;
298 return buffers[(idx + index) & (message_max_buffers - 1)];
299 }
300 inline const shared_buffer_t& at(mlength_t idx) const {
301 if (idx < 0) idx += length;
302 return buffers[(idx + index) & (message_max_buffers - 1)];
303 }
304
305 inline void push_back(const shared_buffer_t& buf) {
306 if (buf.size() == 0) return;
307 if (length == message_max_buffers) compact();
308 at(length) = buf;
309 length++;
310 }
311
312 inline void push_front(const shared_buffer_t& buf) {
313 if (buf.size() == 0) return;
314 if (length == message_max_buffers) compact();
315 index--;
316 length++;
317 at(0) = buf;
318 }
319
320 inline shared_buffer_t pop_back() {
321 shared_buffer_t& buf = at(-1);
322 shared_buffer_t ret = buf;
323 buf.reset();
324 length--;
325 return ret;
326 }
327
328 inline shared_buffer_t pop_front() {
329 shared_buffer_t& buf = at(0);
330 shared_buffer_t ret = buf;
331 buf.reset();
332 length--;
333 index++;
334 return ret;
335 }
336
337 /// compacts the buffers, so one more buffer is available
338 inline void compact() {
339
340 // find compacting candidate
341 bsize_t min_size=~0, min_pos=0;
342 for (mlength_t i=0; i<length; i++) {
343 bsize_t c = at(i).size() + at(i+1).size();
344 if (c < min_size || min_size == ~(bsize_t)0 ) {
345 min_size = c;
346 min_pos = i;
347 }
348 }
349
350 // compact buffers
351 shared_buffer_t nb(min_size);
352 at(min_pos).copy_to( nb, 0 );
353 at(min_pos+1).copy_to( nb, at(min_pos).size() );
354
355 // move buffers and assign new buffer
356 for (mlength_t i=min_pos+1; i<length; i++) at(i) = at(i+1);
357 at(min_pos) = nb;
358
359 length--;
360 }
361 };
362 /// own a new message
363 inline imsg_t& own() {
364 if (imsg.get() != NULL && imsg->owner == this) return *imsg;
365 if (imsg.get() == NULL) imsg = boost::shared_ptr<imsg_t>(new imsg_t());
366 else imsg = boost::shared_ptr<imsg_t>(new imsg_t(*imsg));
367 imsg->owner = this;
368 return *imsg;
369 }
370 boost::shared_ptr<imsg_t> imsg;
371
372 // XXX Mario: Debugging
373public:
374 const std::string MAGIC_IDENTIFIER;
375 const int MAGIC_NUMBER;
376};
377
378inline message_t operator+(const message_t& lhs, const message_t& rhs) {
379 message_t m = lhs;
380 m.push_back(rhs);
381 return m;
382}
383
384inline message_t operator+(const message_t& lhs, const buffer_t& rhs) {
385 message_t m = lhs;
386 m.push_back(rhs);
387 return m;
388}
389
390inline message_t operator+(const shared_buffer_t& lhs, const message_t& rhs) {
391 message_t m = rhs;
392 m.push_front(lhs);
393 return m;
394}
395
396inline message_t operator+(const shared_buffer_t& lhs,
397 const shared_buffer_t& rhs) {
398 message_t m;
399 m.push_back(lhs);
400 m.push_back(rhs);
401 return m;
402}
403
404std::ostream& operator<<(std::ostream&, const message_t);
405
406} /* namespace reboost */
407
408#endif /* REBOOST_MESSAGE_HPP_ */
Note: See TracBrowser for help on using the repository browser.