source: source/ariba/utility/transport/messages/message.hpp@ 10700

Last change on this file since 10700 was 10653, checked in by Michael Tänzer, 12 years ago

Merge the ASIO branch back into trunk

File size: 8.9 KB
Line 
1//-----------------------------------------------------------------------------
2// Part of reboost (http://reboost.org). Released under the
3// BSD 2-clause license (http://www.opensource.org/licenses/bsd-license.php).
4// Copyright 2012, Sebastian Mies <mies@reboost.org> --- All rights reserved.
5//-----------------------------------------------------------------------------
6
7#ifndef REBOOST_MESSAGE_HPP_
8#define REBOOST_MESSAGE_HPP_
9
10#include<boost/thread.hpp>
11#include<boost/shared_ptr.hpp>
12#include<cstring>
13
14#include "shared_buffer.hpp"
15
16namespace reboost {
17
18/// message size type
19typedef signed char mlength_t;
20
21/// maximum number of buffers per message (default is 8)
22const mlength_t message_max_buffers = (1L << 3);
23
24//! A Copy-on-Write Message with Shared Buffers.
25/**
26 * A Copy-on-Write Message with Shared Buffers.
27 *
28 * A message holds a limited (defined by <code>message_max_buffers</code>)
29 * number of shared buffers. One can add new buffers and messages in front and
30 * at the end of a message. If the no. of buffers exceed
31 * <code>message_max_buffers</code>, then the two smallest successive buffers
32 * are compacted to one buffer.
33 *
34 * @author Sebastian Mies <mies@reboost.org>
35 */
36class message_t {
37private:
38 // read sub-message
39 struct sub_message {
40 message_t* msg;
41 inline void operator()(shared_buffer_t buf) {
42 msg->push_back(buf);
43 }
44 };
45
46 // read from buffer
47 struct read_buffer {
48 boctet_t* buffer;
49 inline void operator()(buffer_t buf) {
50 memcpy((void*) buffer, (void*) buf.data(), buf.size());
51 buffer += buf.size();
52 }
53 };
54
55 // write to buffer
56 struct write_buffer {
57 const boctet_t* buffer;
58 inline void operator()(buffer_t buf) {
59 memcpy((void*) buf.data(), (void*) buffer, buf.size());
60 buffer += buf.size();
61 }
62 };
63
64public:
65 /// Create a new message
66 inline message_t() :
67 imsg() {
68 }
69
70 /// Copy message
71 inline message_t(const message_t& msg) :
72 imsg(msg.imsg) {
73 imsg->owner = NULL;
74 }
75
76 /// Linearize message
77 inline operator shared_buffer_t() const {
78 return linearize();
79 }
80
81 /// Assign another message
82 inline message_t& operator=(const message_t& msg) {
83 msg.imsg->owner = NULL;
84 imsg = msg.imsg;
85 return *this;
86 }
87
88 /// Adds a shared buffer of given site at the end
89 inline shared_buffer_t& push_back( bsize_t size ) {
90 shared_buffer_t b(size); push_back(b);
91 return imsg->at(-1);
92 }
93
94 /// Adds a buffer at the end of the message
95 inline void push_back(const shared_buffer_t& buf) {
96 own().push_back(buf);
97 }
98
99 /// Adds a message at the end of the message
100 inline void push_back(const message_t& msg) {
101 own();
102 for (mlength_t i = 0; i < msg.length(); i++)
103 push_back(msg[i]);
104 }
105
106 /// Adds a shared buffer of given size at the front
107 inline shared_buffer_t& push_front( bsize_t size ) {
108 shared_buffer_t b(size); push_front(b);
109 return imsg->at(0);
110 }
111
112 /// Adds a buffer at the front of the messsage
113 inline void push_front(const shared_buffer_t& buf) {
114 own().push_front(buf);
115 }
116
117 /// Adds a message at the end of the message
118 inline void push_front(const message_t& msg) {
119 own();
120 for (mlength_t i = msg.length() - 1; i != 0; i--)
121 push_front(msg[i]);
122 }
123
124 /// Removes a buffer from the end of the message
125 inline shared_buffer_t pop_back() {
126 return own().pop_back();
127 }
128
129 /// Removes a buffer from the beginning of this message.
130 inline shared_buffer_t pop_front() {
131 return own().pop_front();
132 }
133
134 /// Returns the size of the message in bytes (or octets).
135 inline size_t size() const {
136 size_t s = 0;
137 for (mlength_t i = 0; i < length(); i++)
138 s += operator[](i).size();
139 return (s);
140 }
141
142 /// Returns the number of buffers inside this message.
143 inline mlength_t length() const {
144 return (imsg->length);
145 }
146
147 /// Returns the buffer at the given index.
148 inline shared_buffer_t& operator[](mlength_t idx) {
149 return at(idx);
150 }
151
152 /// Returns the buffer at the given index.
153 inline shared_buffer_t& at(mlength_t idx) {
154 return imsg->at(idx);
155 }
156
157 /// Returns the constant buffer at the given index.
158 inline const shared_buffer_t& operator[](mlength_t idx) const {
159 return at(idx);
160 }
161
162 /// Returns the buffer at the given index
163 inline const shared_buffer_t& at(mlength_t idx) const {
164 return imsg->at(idx);
165 }
166
167 /// Iterates over a partial set of buffers.
168 template<typename T>
169 inline void foreach(const T& work, size_t index_ = 0, size_t size_ = 0) const {
170 T op = work;
171 if (size_ == 0) size_ = size() - index_;
172
173 // get first buffer
174 mlength_t f = 0, pf = 0;
175 for (; (pf + at(f).size()) <= index_ && f < imsg->length;
176 pf += at(f).size(), f++);
177 // get last buffer
178 mlength_t l = f, pl = pf;
179 for (; (pl + at(l).size()) < (index_ + size_) && l < imsg->length;
180 pl += at(l).size(), l++);
181
182 // same buffer? yes-> get sub-buffer
183 if (l == f) op(at(l)(index_ - pf, size_));
184 else { // no-> get sub-buffers :)
185 op(at(f)(index_ - pf));
186 for (mlength_t i = f + 1; i < l; i++) op(at(i));
187 op(at(l)(0, index_ + size_ - pl));
188 }
189 }
190
191 /// Read bytes (gather).
192 inline void read(boctet_t* mem, size_t idx = 0, size_t size_ = 0) const {
193 struct read_buffer rb = { mem };
194 foreach(rb, idx, size_);
195 }
196
197 /// write bytes
198 inline void write(const boctet_t* mem, size_t idx = 0, size_t size_ = 0) {
199 struct write_buffer wb = { mem };
200 foreach(wb, idx, size_);
201 }
202
203 /// Read an arbitrary, binary object.
204 template<class T>
205 inline T read(size_t index) {
206 T obj;
207 read((boctet_t*) &obj, index, sizeof(T));
208 return obj;
209 }
210
211 /// Write an arbitrary, binary object.
212 template<class T>
213 inline void write(const T& value, size_t index) {
214 write((boctet_t*) &value, index, sizeof(T));
215 }
216
217 /// Calculate a (ELF-like) hash.
218 inline size_t hash() const {
219 size_t h = 0;
220 for (mlength_t i = 0; i < length(); i++)
221 h += at(i).hash() * (i + 1);
222 return h;
223 }
224
225 /// Returns a sub-message.
226 message_t operator()(size_t index, size_t size = 0) const {
227 message_t m;
228 struct sub_message sm = { &m };
229 foreach(sm, index, size);
230 return m;
231 }
232
233 /// Linearizes the complete/partial message into one shared buffer.
234 inline shared_buffer_t linearize(size_t index = 0, size_t size_ = 0) const {
235 shared_buffer_t b(size_ == 0 ? size() : size_);
236 read(b.mutable_data(), index, size_);
237 return b;
238 }
239
240private:
241 class imsg_t {
242 public:
243 volatile message_t* owner;
244 shared_buffer_t buffers[message_max_buffers];
245 mlength_t index, length;
246 public:
247 inline imsg_t() :
248 index(0), length(0) {
249 }
250 inline imsg_t(const imsg_t& imsg) :
251 index(imsg.index), length(imsg.length) {
252 for (mlength_t i = 0; i < length; i++)
253 at(index + i) = imsg.at(index + i);
254 }
255 inline shared_buffer_t& at(mlength_t idx) {
256 if (idx < 0) idx += length;
257 return buffers[(idx + index) & (message_max_buffers - 1)];
258 }
259 inline const shared_buffer_t& at(mlength_t idx) const {
260 if (idx < 0) idx += length;
261 return buffers[(idx + index) & (message_max_buffers - 1)];
262 }
263
264 inline void push_back(const shared_buffer_t& buf) {
265 if (buf.size() == 0) return;
266 if (length == message_max_buffers) compact();
267 at(length) = buf;
268 length++;
269 }
270
271 inline void push_front(const shared_buffer_t& buf) {
272 if (buf.size() == 0) return;
273 if (length == message_max_buffers) compact();
274 index--;
275 length++;
276 at(0) = buf;
277 }
278
279 inline shared_buffer_t pop_back() {
280 shared_buffer_t& buf = at(-1);
281 shared_buffer_t ret = buf;
282 buf.reset();
283 length--;
284 return ret;
285 }
286
287 inline shared_buffer_t pop_front() {
288 shared_buffer_t& buf = at(0);
289 shared_buffer_t ret = buf;
290 buf.reset();
291 length--;
292 index++;
293 return ret;
294 }
295
296 /// compacts the buffers, so one more buffer is available
297 inline void compact() {
298
299 // find compacting candidate
300 bsize_t min_size=~0, min_pos=0;
301 for (mlength_t i=0; i<length; i++) {
302 bsize_t c = at(i).size() + at(i+1).size();
303 if (c < min_size || min_size == ~(bsize_t)0 ) {
304 min_size = c;
305 min_pos = i;
306 }
307 }
308
309 // compact buffers
310 shared_buffer_t nb(min_size);
311 at(min_pos).copy_to( nb, 0 );
312 at(min_pos+1).copy_to( nb, at(min_pos).size() );
313
314 // move buffers and assign new buffer
315 for (mlength_t i=min_pos+1; i<length; i++) at(i) = at(i+1);
316 at(min_pos) = nb;
317
318 length--;
319 }
320 };
321 /// own a new message
322 inline imsg_t& own() {
323 if (imsg.get() != NULL && imsg->owner == this) return *imsg;
324 if (imsg.get() == NULL) imsg = boost::shared_ptr<imsg_t>(new imsg_t());
325 else imsg = boost::shared_ptr<imsg_t>(new imsg_t(*imsg));
326 imsg->owner = this;
327 return *imsg;
328 }
329 boost::shared_ptr<imsg_t> imsg;
330};
331
332inline message_t operator+(const message_t& lhs, const message_t& rhs) {
333 message_t m = lhs;
334 m.push_back(rhs);
335 return m;
336}
337
338inline message_t operator+(const message_t& lhs, const buffer_t& rhs) {
339 message_t m = lhs;
340 m.push_back(rhs);
341 return m;
342}
343
344inline message_t operator+(const shared_buffer_t& lhs, const message_t& rhs) {
345 message_t m = rhs;
346 m.push_front(lhs);
347 return m;
348}
349
350inline message_t operator+(const shared_buffer_t& lhs,
351 const shared_buffer_t& rhs) {
352 message_t m;
353 m.push_back(lhs);
354 m.push_back(rhs);
355 return m;
356}
357
358std::ostream& operator<<(std::ostream&, const message_t);
359
360} /* namespace reboost */
361
362#endif /* REBOOST_MESSAGE_HPP_ */
Note: See TracBrowser for help on using the repository browser.