source: source/ariba/utility/system/SystemQueue.cpp@ 12764

Last change on this file since 12764 was 12764, checked in by hock@…, 10 years ago

New SystemQueue...

... is passing all unit tests, now. :-)

File size: 11.0 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <ariba/utility/misc/sha1.h>
40#include <stdexcept>
41
42// TODO Mario:
43// check if there is any debug out left to remove
44
45namespace ariba {
46namespace utility {
47
48typedef boost::mutex::scoped_lock scoped_lock;
49
50using boost::posix_time::microsec_clock;
51using boost::posix_time::time_duration;
52using boost::date_time::not_a_date_time;
53using boost::scoped_ptr;
54
55
56use_logging_cpp(SystemQueue);
57
58SystemQueue::SystemQueue() :
59 SysQ( new QueueThread() )
60{
61 logging_debug("Creating SystemQueue at: " << this);
62}
63
64SystemQueue::~SystemQueue()
65{
66}
67
68
69SystemQueue& SystemQueue::instance()
70{
71 static SystemQueue _inst;
72 return _inst;
73}
74
75
76void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
77{
78 assert ( SysQ->running ); // should we really enforce this?
79
80 // copy
81 SystemEvent ev(event);
82
83 SysQ->insert(ev, delay);
84}
85
86// maps to function call internally to the Event-system
87void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
88{
89 // copy function object
90 boost::function0<void>* function_ptr = new boost::function0<void>();
91 (*function_ptr) = function;
92
93 // schedule special call-event
94 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
95}
96
97
98
99
100void SystemQueue::run()
101{
102 assert ( ! SysQ->running );
103 assert ( ! SysQ->unclean );
104
105 SysQ->running = true;
106
107 // start thread
108 sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
109}
110
111void SystemQueue::cancel()
112{
113 // CHECK: this function must not be called from within a SystemQueue-Event
114 if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
115 {
116 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
117 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
118 }
119
120
121 // signal SysQ to quit (and abort queued events)
122 SysQ->cancel();
123
124 // wait till actually completes
125 // (should be fast, but the current event is allowed to finish)
126 if ( sysq_thread )
127 {
128 logging_debug("/// ... joining SysQ thread");
129 sysq_thread->join();
130 }
131
132 // delete thread object
133 sysq_thread.reset();
134
135 assert ( ! SysQ->isRunning() );
136
137
138 // clean up and respawn
139 logging_debug("/// respawning SysQ");
140 SysQ.reset( new QueueThread() );
141}
142
143void SystemQueue::dropAll( const SystemEventListener* mlistener)
144{
145// TODO
146// directScheduler.dropAll(mlistener);
147// delayScheduler.dropAll(mlistener);
148}
149
150bool SystemQueue::isEmpty()
151{
152 return SysQ->isEmpty();
153}
154
155bool SystemQueue::isRunning()
156{
157 return SysQ->isRunning();
158}
159
160
161//********************************************************
162
163
164/// constructor
165SystemQueue::QueueThread::QueueThread() :
166 processing_event( false ),
167 running( false ),
168 aborted( false ),
169 unclean( false )
170{
171}
172
173SystemQueue::QueueThread::~QueueThread(){
174}
175
176
177void SystemQueue::QueueThread::operator()()
178{
179 logging_debug( "/// SysQ thread is alive." );
180
181 assert( running ); // this is set before the thread starts
182
183 // main loop
184 while ( ! aborted )
185 {
186 // run next immediate event (only one)
187 run_immediate_event();
188
189 // maintain timed events (move to immediateEventsQ, when deadline expired)
190 check_timed_queue();
191
192 // wait for next deadline (if no immediate events pending)
193 wait_for_next_deadline();
194 }
195
196 logging_debug( "/// SysQ thread is quitting." );
197
198 unclean = true;
199 running = false;
200}
201
202
203
204
205
206/// main loop functions
207
208void SystemQueue::QueueThread::run_immediate_event()
209{
210 // get next event and process it
211 if ( ! immediateEventsQ.empty() )
212 {
213 scoped_ptr<SystemEvent> currently_processed_event;
214
215 /* dequeue event */
216 // SYNCHRONIZED
217 {
218 scoped_lock lock( queue_mutex );
219
220 this->processing_event = true;
221
222 // * dequeue first event *
223 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy
224 immediateEventsQ.pop_front();
225 }
226
227 /* dispatch event */
228// logging_debug("/// SysQ: dispatching event");
229
230 // measure execution time (1/2)
231 ptime start_time = get_clock();
232
233 // * dispatch event *
234 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
235
236 // measure execution time (2/2)
237 time_duration execution_time = get_clock() - start_time;
238
239 // DEBUG OUTPUT: warning when execution takes too much time
240 // [ TODO how long is "too much time"? ]
241 if ( execution_time.total_milliseconds() > 50 )
242 {
243 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
244 }
245
246 /* [ TODO ]
247 *
248 * - we could also measure how long an event has been waiting in the queue before it's dispatched
249 * (in order to detect overload)
250 *
251 * - especially for timed events, the displacement could be calculated
252 * (and, e.g., put in relation with the actual intended sleep time)
253 */
254 }
255
256 this->processing_event = false;
257}
258
259void SystemQueue::QueueThread::check_timed_queue()
260{
261 // this whole function is SYNCHRONIZED
262 scoped_lock lock( queue_mutex );
263
264 ptime now = get_clock();
265 bool not_expired_events_reached = false;
266
267 // move all expired events into the immediateEventsQ
268 while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
269 {
270 const SystemEvent& ev = timedEventsQ.top();
271
272 time_duration remaining_sleep_time = ev.deadline - now;
273
274 // BRANCH: deadline reached
275 if ( remaining_sleep_time.is_negative() )
276 {
277 // move to immediateEventsQ
278 immediateEventsQ.push_back(ev);
279 timedEventsQ.pop();
280 }
281 // BRANCH: deadline not reached
282 else
283 {
284 // okay, that's all for now.
285 not_expired_events_reached = true;
286 }
287 } // while
288}
289
290void SystemQueue::QueueThread::wait_for_next_deadline()
291{
292 // SYNCHRONIZED
293 boost::mutex::scoped_lock lock(queue_mutex);
294
295 if ( immediateEventsQ.empty() )
296 {
297 // don't sleep when the SystemQueue is not already canceled
298 if ( aborted )
299 return;
300
301
302 // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
303 if ( timedEventsQ.empty() )
304 {
305 logging_debug("/// SysQ is going to sleep.");
306
307 this->system_queue_idle.wait( lock );
308 }
309 // BRANCH: sleep till next timed event
310 else
311 {
312 logging_debug( "/// SysQ is going to sleep for "
313 << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
314 << "ms. Deadline: "
315 << timedEventsQ.top().deadline
316 << ", Clock: "
317 << get_clock() );
318
319 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
320 }
321 }
322}
323
324
325/// uniform clock interface
326ptime SystemQueue::QueueThread::get_clock()
327{
328 return microsec_clock::universal_time();
329}
330
331
332
333/// external interface
334
335bool SystemQueue::QueueThread::isRunning()
336{
337 return running;
338}
339
340
341void SystemQueue::QueueThread::cancel()
342{
343 logging_debug("/// Cancelling system queue... ");
344
345 // SYNCHRONIZED
346 {
347 scoped_lock lock(queue_mutex);
348 aborted = true;
349 }
350
351 logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
352 << timedEventsQ.size() << " timed event(s) left.");
353
354 system_queue_idle.notify_all();
355}
356
357
358void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay )
359{
360 event.scheduledTime = get_clock();
361
362 // SYNCHRONIZED
363 {
364 scoped_lock lock( queue_mutex );
365
366 // BRANCH: immediate event
367 if ( delay == 0 )
368 {
369 immediateEventsQ.push_back(event);
370 }
371 // BRANCH: timed event
372 else
373 {
374 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
375 event.delayTime = delay; // ( I think this is no longer needed.. )
376
377// // debug output
378// logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
379
380 timedEventsQ.push(event);
381 }
382 }
383
384 // wake SysQ thread
385 system_queue_idle.notify_one(); // NOTE: there is only one thread
386 // (so it doesn't matter whether to call notify_one, or notify_all)
387}
388
389
390bool SystemQueue::QueueThread::isEmpty()
391{
392 // SYNCHRONIZED
393 scoped_lock lock( queue_mutex );
394
395 return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
396}
397
398
399
400// FIXME
401void SystemQueue::enterMethod()
402{
403 assert( false );
404}
405void SystemQueue::leaveMethod()
406{
407 assert( false );
408}
409
410
411
412// XXX code from old system queue
413// void SystemQueue::QueueThread::enter(){
414// queueMutex.lock();
415// }
416//
417// void SystemQueue::QueueThread::leave(){
418// queueMutex.unlock();
419// }
420
421}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.