source: source/ariba/utility/transport/messages/message.hpp@ 12774

Last change on this file since 12774 was 12774, checked in by hock@…, 10 years ago

StreamTransport bug fixed!!

[ Back to normal. :-) ]

File size: 9.4 KB
Line 
1//-----------------------------------------------------------------------------
2// Part of reboost (http://reboost.org). Released under the
3// BSD 2-clause license (http://www.opensource.org/licenses/bsd-license.php).
4// Copyright 2012, Sebastian Mies <mies@reboost.org> --- All rights reserved.
5//-----------------------------------------------------------------------------
6
7#ifndef REBOOST_MESSAGE_HPP_
8#define REBOOST_MESSAGE_HPP_
9
10#include<boost/thread.hpp>
11#include<boost/shared_ptr.hpp>
12#include<cstring>
13#include <cassert>
14
15#include "shared_buffer.hpp"
16
17namespace reboost {
18
19/// message size type
20//typedef signed char mlength_t; // <--- don't do this!!
21//typedef size_t mlength_t;
22typedef int mlength_t; // signed int seems necessary
23
24/// maximum number of buffers per message (default is 8)
25const mlength_t message_max_buffers = (1L << 3);
26//const mlength_t message_max_buffers = (1L << 4);
27
28//! A Copy-on-Write Message with Shared Buffers.
29/**
30 * A Copy-on-Write Message with Shared Buffers.
31 *
32 * A message holds a limited (defined by <code>message_max_buffers</code>)
33 * number of shared buffers. One can add new buffers and messages in front and
34 * at the end of a message. If the no. of buffers exceed
35 * <code>message_max_buffers</code>, then the two smallest successive buffers
36 * are compacted to one buffer.
37 *
38 * @author Sebastian Mies <mies@reboost.org>
39 */
40class message_t {
41private:
42 // read sub-message
43 struct sub_message {
44 message_t* msg;
45 inline void operator()(shared_buffer_t buf) {
46 msg->push_back(buf);
47 }
48 };
49
50 // read from buffer
51 struct read_buffer {
52 boctet_t* buffer;
53 inline void operator()(buffer_t buf) {
54 memcpy((void*) buffer, (void*) buf.data(), buf.size());
55 buffer += buf.size();
56 }
57 };
58
59 // write to buffer
60 struct write_buffer {
61 const boctet_t* buffer;
62 inline void operator()(buffer_t buf) {
63 memcpy((void*) buf.data(), (void*) buffer, buf.size());
64 buffer += buf.size();
65 }
66 };
67
68public:
69 /// Create a new message
70 inline message_t() :
71 imsg()
72 {
73 }
74
75 /// Copy message
76 inline message_t(const message_t& msg) :
77 imsg(msg.imsg)
78
79 {
80 if ( imsg )
81 imsg->owner = NULL;
82 }
83
84 /// Linearize message
85 inline operator shared_buffer_t() const {
86 return linearize();
87 }
88
89 /// Assign another message
90 inline message_t& operator=(const message_t& msg)
91 {
92 if ( msg.imsg )
93 {
94 msg.imsg->owner = NULL;
95 imsg = msg.imsg;
96 }
97 else
98 {
99 imsg.reset();
100 }
101
102 return *this;
103 }
104
105 /// Adds a shared buffer of given site at the end
106 inline shared_buffer_t& push_back( bsize_t size ) {
107 shared_buffer_t b(size); push_back(b);
108 return imsg->at(-1);
109 }
110
111 /// Adds a buffer at the end of the message
112 inline void push_back(const shared_buffer_t& buf) {
113 own().push_back(buf);
114 }
115
116 /// Adds a message at the end of the message
117 inline void push_back(const message_t& msg) {
118 own();
119 for (mlength_t i = 0; i < msg.length(); i++)
120 push_back(msg[i]);
121 }
122
123 /// Adds a shared buffer of given size at the front
124 inline shared_buffer_t& push_front( bsize_t size ) {
125 shared_buffer_t b(size); push_front(b);
126 return imsg->at(0);
127 }
128
129 /// Adds a buffer at the front of the messsage
130 inline void push_front(const shared_buffer_t& buf) {
131 own().push_front(buf);
132 }
133
134 /// Adds a message at the end of the message
135 inline void push_front(const message_t& msg) {
136 own();
137 for (mlength_t i = msg.length() - 1; i != 0; i--)
138 push_front(msg[i]);
139 }
140
141 /// Removes a buffer from the end of the message
142 inline shared_buffer_t pop_back() {
143 return own().pop_back();
144 }
145
146 /// Removes a buffer from the beginning of this message.
147 inline shared_buffer_t pop_front() {
148 return own().pop_front();
149 }
150
151 /// Returns the size of the message in bytes (or octets).
152 inline size_t size() const {
153 size_t s = 0;
154 for (mlength_t i = 0; i < length(); i++)
155 s += operator[](i).size();
156 return (s);
157 }
158
159 /// Returns the number of buffers inside this message.
160 inline mlength_t length() const {
161 if ( ! imsg )
162 return 0;
163
164 return (imsg->length);
165 }
166
167 /// Returns the buffer at the given index.
168 inline shared_buffer_t& operator[](mlength_t idx) {
169 return at(idx);
170 }
171
172 /// Returns the buffer at the given index.
173 inline shared_buffer_t& at(mlength_t idx) {
174 return imsg->at(idx);
175 }
176
177 /// Returns the constant buffer at the given index.
178 inline const shared_buffer_t& operator[](mlength_t idx) const {
179 return at(idx);
180 }
181
182 /// Returns the buffer at the given index
183 inline const shared_buffer_t& at(mlength_t idx) const {
184 return imsg->at(idx);
185 }
186
187 /// Iterates over a partial set of buffers.
188 template<typename T>
189 inline void msg_foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
190 T op = work;
191 if (size_ == 0) size_ = size() - index_;
192
193 // get first buffer
194 mlength_t f = 0, pf = 0;
195 for (; (pf + at(f).size()) <= index_ && f < imsg->length;
196 pf += at(f).size(), f++);
197 // get last buffer
198 mlength_t l = f, pl = pf;
199 for (; (pl + at(l).size()) < (index_ + size_) && l < imsg->length;
200 pl += at(l).size(), l++);
201
202 // same buffer? yes-> get sub-buffer
203 if (l == f) op(at(l)(index_ - pf, size_));
204 else { // no-> get sub-buffers :)
205 op(at(f)(index_ - pf));
206 for (mlength_t i = f + 1; i < l; i++) op(at(i));
207 op(at(l)(0, index_ + size_ - pl));
208 }
209 }
210
211 /// Read bytes (gather).
212 inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const {
213 struct read_buffer rb = { mem };
214 msg_foreach(rb, idx, size_);
215 }
216
217 /// write bytes
218 inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) {
219 struct write_buffer wb = { mem };
220 msg_foreach(wb, idx, size_);
221 }
222
223 /// Read an arbitrary, binary object.
224 template<class T>
225 inline T read(size_t index) {
226 T obj;
227 read((boctet_t*) &obj, index, sizeof(T));
228 return obj;
229 }
230
231 /// Write an arbitrary, binary object.
232 template<class T>
233 inline void write(const T& value, size_t index) {
234 write((boctet_t*) &value, index, sizeof(T));
235 }
236
237 /// Calculate a (ELF-like) hash.
238 inline size_t hash() const {
239 size_t h = 0;
240 for (mlength_t i = 0; i < length(); i++)
241 h += at(i).hash() * (i + 1);
242 return h;
243 }
244
245 /// Returns a sub-message.
246 message_t operator()(size_t index, size_t size = 0) const {
247 message_t m;
248 struct sub_message sm = { &m };
249 msg_foreach(sm, index, size);
250 return m;
251 }
252
253 /// Linearizes the complete/partial message into one shared buffer.
254 inline shared_buffer_t linearize(size_t index = 0, size_t size_ = 0) const {
255 shared_buffer_t b(size_ == 0 ? size() : size_);
256 read(b.mutable_data(), index, size_);
257 return b;
258 }
259
260private:
261 class imsg_t {
262 public:
263 volatile message_t* owner;
264 shared_buffer_t buffers[message_max_buffers];
265 mlength_t index, length;
266
267 public:
268 inline imsg_t() :
269 owner(NULL),
270 index(0),
271 length(0)
272 {
273 }
274 inline imsg_t(const imsg_t& imsg) :
275 index(imsg.index), length(imsg.length)
276 {
277 for (mlength_t i = 0; i < length; i++)
278 at(index + i) = imsg.at(index + i);
279 }
280 inline shared_buffer_t& at(mlength_t idx) {
281 if (idx < 0) idx += length;
282 return buffers[(idx + index) & (message_max_buffers - 1)];
283 }
284 inline const shared_buffer_t& at(mlength_t idx) const {
285 if (idx < 0) idx += length;
286 return buffers[(idx + index) & (message_max_buffers - 1)];
287 }
288
289 inline void push_back(const shared_buffer_t& buf) {
290 if (buf.size() == 0) return;
291 if (length == message_max_buffers) compact();
292 at(length) = buf;
293 length++;
294 }
295
296 inline void push_front(const shared_buffer_t& buf) {
297 if (buf.size() == 0) return;
298 if (length == message_max_buffers) compact();
299 index--;
300 length++;
301 at(0) = buf;
302 }
303
304 inline shared_buffer_t pop_back() {
305 shared_buffer_t& buf = at(-1);
306 shared_buffer_t ret = buf;
307 buf.reset();
308 length--;
309 return ret;
310 }
311
312 inline shared_buffer_t pop_front() {
313 shared_buffer_t& buf = at(0);
314 shared_buffer_t ret = buf;
315 buf.reset();
316 length--;
317 index++;
318 return ret;
319 }
320
321 /// compacts the buffers, so one more buffer is available
322 inline void compact() {
323
324 // find compacting candidate
325 bsize_t min_size=~0, min_pos=0;
326 for (mlength_t i=0; i<length; i++) {
327 bsize_t c = at(i).size() + at(i+1).size();
328 if (c < min_size || min_size == ~(bsize_t)0 ) {
329 min_size = c;
330 min_pos = i;
331 }
332 }
333
334 // compact buffers
335 shared_buffer_t nb(min_size);
336 at(min_pos).copy_to( nb, 0 );
337 at(min_pos+1).copy_to( nb, at(min_pos).size() );
338
339 // move buffers and assign new buffer
340 for (mlength_t i=min_pos+1; i<length; i++) at(i) = at(i+1);
341 at(min_pos) = nb;
342
343 length--;
344 }
345 }; // [ class imsg_t ] -- inner class
346
347 /// own a new message
348 inline imsg_t& own() {
349 if (imsg.get() != NULL && imsg->owner == this) return *imsg;
350 if (imsg.get() == NULL) imsg = boost::shared_ptr<imsg_t>(new imsg_t());
351 else imsg = boost::shared_ptr<imsg_t>(new imsg_t(*imsg));
352 imsg->owner = this;
353 return *imsg;
354 }
355 boost::shared_ptr<imsg_t> imsg;
356}; // [ class message_t ] -- outer class
357
358inline message_t operator+(const message_t& lhs, const message_t& rhs) {
359 message_t m = lhs;
360 m.push_back(rhs);
361 return m;
362}
363
364inline message_t operator+(const message_t& lhs, const buffer_t& rhs) {
365 message_t m = lhs;
366 m.push_back(rhs);
367 return m;
368}
369
370inline message_t operator+(const shared_buffer_t& lhs, const message_t& rhs) {
371 message_t m = rhs;
372 m.push_front(lhs);
373 return m;
374}
375
376inline message_t operator+(const shared_buffer_t& lhs,
377 const shared_buffer_t& rhs) {
378 message_t m;
379 m.push_back(lhs);
380 m.push_back(rhs);
381 return m;
382}
383
384std::ostream& operator<<(std::ostream&, const message_t);
385
386} /* namespace reboost */
387
388#endif /* REBOOST_MESSAGE_HPP_ */
Note: See TracBrowser for help on using the repository browser.