source: source/ariba/utility/system/SystemQueue.cpp@ 12765

Last change on this file since 12765 was 12765, checked in by hock@…, 11 years ago

new functionality: SystemQueue::leave() and SystemQueue::join()

( + unit tests)

File size: 11.4 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <ariba/utility/misc/sha1.h>
40#include <stdexcept>
41
42// TODO Mario:
43// check if there is any debug out left to remove
44
45namespace ariba {
46namespace utility {
47
48typedef boost::mutex::scoped_lock scoped_lock;
49
50using boost::posix_time::microsec_clock;
51using boost::posix_time::time_duration;
52using boost::date_time::not_a_date_time;
53using boost::scoped_ptr;
54
55
56use_logging_cpp(SystemQueue);
57
58SystemQueue::SystemQueue() :
59 SysQ( new QueueThread() )
60{
61 logging_debug("Creating SystemQueue at: " << this);
62}
63
64SystemQueue::~SystemQueue()
65{
66}
67
68
69SystemQueue& SystemQueue::instance()
70{
71 static SystemQueue _inst;
72 return _inst;
73}
74
75
76void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
77{
78// assert ( SysQ->running ); // should we really enforce this?
79 if ( ! SysQ->running )
80 {
81 logging_debug("/// WARNING: The SystemQueue is NOT RUNNING!");
82 }
83
84 // copy
85 SystemEvent ev(event);
86
87 SysQ->insert(ev, delay);
88}
89
90// maps to function call internally to the Event-system
91void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
92{
93 // copy function object
94 boost::function0<void>* function_ptr = new boost::function0<void>();
95 (*function_ptr) = function;
96
97 // schedule special call-event
98 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
99}
100
101
102
103
104void SystemQueue::run()
105{
106 assert ( ! SysQ->running );
107 assert ( ! SysQ->unclean );
108
109 SysQ->running = true;
110
111 // start thread
112 sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
113}
114
115void SystemQueue::cancel()
116{
117 // CHECK: this function must not be called from within a SystemQueue-Event
118 if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
119 {
120 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
121 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
122 }
123
124
125 // signal SysQ to quit (and abort queued events)
126 SysQ->cancel();
127
128 // wait till actually completes
129 // (should be fast, but the current event is allowed to finish)
130 if ( sysq_thread )
131 {
132 logging_debug("/// ... joining SysQ thread");
133 sysq_thread->join();
134 }
135
136 // delete thread object
137 sysq_thread.reset();
138
139 assert ( ! SysQ->isRunning() );
140
141
142 // clean up and respawn
143 logging_debug("/// respawning SysQ");
144 SysQ.reset( new QueueThread() );
145}
146
147
148void SystemQueue::leave()
149{
150 // signal SysQ to quit (and abort queued events)
151 SysQ->cancel();
152}
153
154void SystemQueue::join()
155{
156 if ( sysq_thread )
157 {
158 logging_debug("/// ... joining SysQ thread");
159 sysq_thread->join();
160 }
161}
162
163
164
165void SystemQueue::dropAll( const SystemEventListener* mlistener)
166{
167// TODO
168// directScheduler.dropAll(mlistener);
169// delayScheduler.dropAll(mlistener);
170}
171
172bool SystemQueue::isEmpty()
173{
174 return SysQ->isEmpty();
175}
176
177bool SystemQueue::isRunning()
178{
179 return SysQ->isRunning();
180}
181
182
183//********************************************************
184
185
186/// constructor
187SystemQueue::QueueThread::QueueThread() :
188 processing_event( false ),
189 running( false ),
190 aborted( false ),
191 unclean( false )
192{
193}
194
195SystemQueue::QueueThread::~QueueThread(){
196}
197
198
199void SystemQueue::QueueThread::operator()()
200{
201 logging_debug( "/// SysQ thread is alive." );
202
203 assert( running ); // this is set before the thread starts
204
205 // main loop
206 while ( ! aborted )
207 {
208 // run next immediate event (only one)
209 run_immediate_event();
210
211 // maintain timed events (move to immediateEventsQ, when deadline expired)
212 check_timed_queue();
213
214 // wait for next deadline (if no immediate events pending)
215 wait_for_next_deadline();
216 }
217
218 logging_debug( "/// SysQ thread is quitting." );
219
220 unclean = true;
221 running = false;
222}
223
224
225
226
227
228/// main loop functions
229
230void SystemQueue::QueueThread::run_immediate_event()
231{
232 // get next event and process it
233 if ( ! immediateEventsQ.empty() )
234 {
235 scoped_ptr<SystemEvent> currently_processed_event;
236
237 /* dequeue event */
238 // SYNCHRONIZED
239 {
240 scoped_lock lock( queue_mutex );
241
242 this->processing_event = true;
243
244 // * dequeue first event *
245 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy
246 immediateEventsQ.pop_front();
247 }
248
249 /* dispatch event */
250// logging_debug("/// SysQ: dispatching event");
251
252 // measure execution time (1/2)
253 ptime start_time = get_clock();
254
255 // * dispatch event *
256 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
257
258 // measure execution time (2/2)
259 time_duration execution_time = get_clock() - start_time;
260
261 // DEBUG OUTPUT: warning when execution takes too much time
262 // [ TODO how long is "too much time"? ]
263 if ( execution_time.total_milliseconds() > 50 )
264 {
265 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
266 }
267
268 /* [ TODO ]
269 *
270 * - we could also measure how long an event has been waiting in the queue before it's dispatched
271 * (in order to detect overload)
272 *
273 * - especially for timed events, the displacement could be calculated
274 * (and, e.g., put in relation with the actual intended sleep time)
275 */
276 }
277
278 this->processing_event = false;
279}
280
281void SystemQueue::QueueThread::check_timed_queue()
282{
283 // this whole function is SYNCHRONIZED
284 scoped_lock lock( queue_mutex );
285
286 ptime now = get_clock();
287 bool not_expired_events_reached = false;
288
289 // move all expired events into the immediateEventsQ
290 while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
291 {
292 const SystemEvent& ev = timedEventsQ.top();
293
294 time_duration remaining_sleep_time = ev.deadline - now;
295
296 // BRANCH: deadline reached
297 if ( remaining_sleep_time.is_negative() )
298 {
299 // move to immediateEventsQ
300 immediateEventsQ.push_back(ev);
301 timedEventsQ.pop();
302 }
303 // BRANCH: deadline not reached
304 else
305 {
306 // okay, that's all for now.
307 not_expired_events_reached = true;
308 }
309 } // while
310}
311
312void SystemQueue::QueueThread::wait_for_next_deadline()
313{
314 // SYNCHRONIZED
315 boost::mutex::scoped_lock lock(queue_mutex);
316
317 if ( immediateEventsQ.empty() )
318 {
319 // don't sleep when the SystemQueue is not already canceled
320 if ( aborted )
321 return;
322
323
324 // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
325 if ( timedEventsQ.empty() )
326 {
327// logging_debug("/// SysQ is going to sleep.");
328
329 this->system_queue_idle.wait( lock );
330 }
331 // BRANCH: sleep till next timed event
332 else
333 {
334// logging_debug( "/// SysQ is going to sleep for "
335// << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
336// << "ms. Deadline: "
337// << timedEventsQ.top().deadline
338// << ", Clock: "
339// << get_clock() );
340
341 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
342 }
343 }
344}
345
346
347/// uniform clock interface
348ptime SystemQueue::QueueThread::get_clock()
349{
350 return microsec_clock::universal_time();
351}
352
353
354
355/// external interface
356
357bool SystemQueue::QueueThread::isRunning()
358{
359 return running;
360}
361
362
363void SystemQueue::QueueThread::cancel()
364{
365 logging_debug("/// Cancelling system queue... ");
366
367 // SYNCHRONIZED
368 {
369 scoped_lock lock(queue_mutex);
370 aborted = true;
371 }
372
373 logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
374 << timedEventsQ.size() << " timed event(s) left.");
375
376 system_queue_idle.notify_all();
377}
378
379
380void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay )
381{
382 event.scheduledTime = get_clock();
383
384 // SYNCHRONIZED
385 {
386 scoped_lock lock( queue_mutex );
387
388 // BRANCH: immediate event
389 if ( delay == 0 )
390 {
391 immediateEventsQ.push_back(event);
392 }
393 // BRANCH: timed event
394 else
395 {
396 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
397 event.delayTime = delay; // ( I think this is no longer needed.. )
398
399// // debug output
400// logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
401
402 timedEventsQ.push(event);
403 }
404 }
405
406 // wake SysQ thread
407 system_queue_idle.notify_one(); // NOTE: there is only one thread
408 // (so it doesn't matter whether to call notify_one, or notify_all)
409}
410
411
412bool SystemQueue::QueueThread::isEmpty()
413{
414 // SYNCHRONIZED
415 scoped_lock lock( queue_mutex );
416
417 return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
418}
419
420
421
422// FIXME
423void SystemQueue::enterMethod()
424{
425 assert( false );
426}
427void SystemQueue::leaveMethod()
428{
429 assert( false );
430}
431
432
433
434// XXX code from old system queue
435// void SystemQueue::QueueThread::enter(){
436// queueMutex.lock();
437// }
438//
439// void SystemQueue::QueueThread::leave(){
440// queueMutex.unlock();
441// }
442
443}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.