source: source/ariba/utility/system/SystemQueue.h@ 12766

Last change on this file since 12766 was 12765, checked in by hock@…, 11 years ago

new functionality: SystemQueue::leave() and SystemQueue::join()

( + unit tests)

File size: 7.5 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#ifndef SYSTEMQUEUE_H_
40#define SYSTEMQUEUE_H_
41
42#include "SystemEvent.h"
43#include "SystemEventListener.h"
44#include "ariba/utility/logging/Logging.h"
45
46#include <cassert>
47#include <list>
48#include <vector>
49#include <queue> // std::priority_queue
50#include <functional> // std::greater
51
52#include <boost/date_time.hpp>
53#include <boost/cstdint.hpp>
54#include <boost/scoped_ptr.hpp>
55
56#include <boost/thread/mutex.hpp>
57#include <boost/thread/thread.hpp>
58#include <boost/thread/condition_variable.hpp>
59#include <boost/utility.hpp>
60#include <boost/bind.hpp>
61
62#include <boost/function.hpp>
63
64namespace ariba {
65namespace utility {
66
67using std::list;
68using boost::posix_time::ptime;
69
70
71/**
72 * This class implements a simple system event queue to allow
73 * a simulation of cooperative multitasking. It also allows
74 * events to be scheduled from other tasks. This allows
75 * dispatching asynchronous tasks.
76 *
77 * @author Christoph Mayer, Sebastian Mies
78 */
79/** XXX Mario Hock -- reworking the entire module **/
80
81
82class SystemQueue : private boost::noncopyable
83{
84 use_logging_h(SystemQueue);
85 friend class EnterMethod;
86public:
87 /**
88 * Get the SystemQueue singleton instance.
89 */
90 static SystemQueue& instance();
91
92 /**
93 * This methods schedules a given event.
94 *
95 * @param The event to be scheduled
96 * @param The delay in milli-seconds
97 */
98 void scheduleEvent( const SystemEvent& event, uint32_t delay = 0 );
99
100 /**
101 * This method schedules a function call in the SystemQueue.
102 * (Like scheduleEvent, but to be used with boost::bind.)
103 *
104 * @param function: The function to be called [void function()]
105 * @param The delay in milli-seconds
106 */
107 void scheduleCall( const boost::function0<void>& function, uint32_t delay = 0 );
108
109 /**
110 * Starts the processing and waiting for events.
111 * Use <code>cancel()</code> to end system queue processing and
112 * <code>isEmpty()</code> to check wheter the queue is empty.
113 */
114 void run();
115
116 /**
117 * Cancels the system queue and ends the processing after the
118 * currently processed event is processed.
119 *
120 * NOTE: Do not call this function from within a SystemQueue-Event.
121 * Use SystemQueue::leave() instead.
122 *
123 * This method is thread-safe.
124 */
125 void cancel();
126
127 /**
128 * Like SystemQueue::cancel(), but may only be called from within a SystemQueue-Event.
129 *
130 * NOTE: In this case some cleanup can not be made. -- If the SystemQueue is
131 * restarted, SystemQueue::cancel() must be called before SystemQueue::run()
132 * can be called again.
133 */
134 void leave();
135
136 /**
137 * Join the SystemQueue thread -- the current thread is blocked until the
138 * SystemQueue finishes.
139 *
140 * NOTE: Use this only in combination with SystemQueue::leave()
141 *
142 * [ There is a possible race condition with SystemQueue::cancel(), but
143 * SystemQueue::join() should not be used at the same time as
144 * SystemQueue::cancel() anyway. (SystemQueue::leave() is fine, though.)
145 */
146 void join();
147
148 /**
149 * Drop all queued events for that listener
150 */
151 void dropAll( const SystemEventListener* mlistener);
152
153 /**
154 * Check wheter this queue has items or not.
155 *
156 * @return True, if this queue is empty.
157 */
158 bool isEmpty();
159
160 /**
161 * Is the system queue already started and running?
162 *
163 * @return True, if the system queue is running.
164 */
165 bool isRunning();
166
167protected:
168
169 /**
170 * Aqcuire the mutex
171 */
172 void enterMethod();
173
174 /**
175 * Leave the mutex
176 */
177 void leaveMethod();
178
179 /**
180 * Constructs a system queue.
181 */
182 SystemQueue();
183
184 /**
185 * Destroys the system queue. Beware that all events
186 * are canceled
187 */
188 ~SystemQueue();
189
190
191/**
192 * inner class of class SystemQueue:
193 *
194 * QueueThread -- the class the does the actual work
195 */
196private:
197
198typedef list<SystemEvent> EventQueue;
199typedef std::priority_queue<SystemEvent,
200 std::vector<SystemEvent>, // [ TODO is vector the best underlay? ]
201 std::greater<SystemEvent> > PriorityEventQueue;
202
203 //********************************************************
204
205 class QueueThread
206 {
207 friend class SystemQueue;
208
209 public:
210 QueueThread();
211 virtual ~QueueThread();
212
213 /// main loop -- called from boost::thread
214 void operator()();
215
216// void run();
217 void cancel();
218 bool isEmpty();
219 void insert( SystemEvent& event, uint32_t delay );
220 void enter();
221 void leave();
222 void dropAll( const SystemEventListener* mlistener);
223 bool isRunning();
224
225 private:
226
227 /// main loop functions
228 void run_immediate_event();
229 void check_timed_queue();
230 void wait_for_next_deadline();
231
232 /// makes sure that always the same clock is used
233 ptime get_clock();
234
235
236 private:
237 EventQueue immediateEventsQ;
238 PriorityEventQueue timedEventsQ;
239
240 boost::condition_variable system_queue_idle;
241 boost::mutex queue_mutex;
242
243 bool processing_event;
244
245 volatile bool running;
246 volatile bool aborted;
247 volatile bool unclean;
248 }; // class QueueThread
249
250
251/// inner class of class SystemQueue
252private:
253 /**
254 * This inner class handles the function-call events.
255 * @see SystemQueue::scheduleCall
256 */
257 class FunctionCaller : public SystemEventListener
258 {
259 void handleSystemEvent(const SystemEvent& event)
260 {
261 boost::function0<void>* function_ptr = event.getData< boost::function0<void> >();
262 (*function_ptr)();
263 delete function_ptr;
264 }
265 };
266
267 FunctionCaller internal_function_caller;
268
269
270
271 /// member variables of class SystemQueue
272private:
273 boost::scoped_ptr<QueueThread> SysQ;
274 boost::scoped_ptr<boost::thread> sysq_thread;
275
276// volatile bool systemQueueRunning;
277
278}; // class SystemQueue
279
280}} // spovnet, common
281
282#endif /* SYSTEMQUEUE_H_ */
Note: See TracBrowser for help on using the repository browser.