source: source/ariba/utility/system/SystemQueue.cpp@ 12765

Last change on this file since 12765 was 12765, checked in by hock@…, 10 years ago

new functionality: SystemQueue::leave() and SystemQueue::join()

( + unit tests)

File size: 11.4 KB
RevLine 
[12761]1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <ariba/utility/misc/sha1.h>
40#include <stdexcept>
41
[12764]42// TODO Mario:
43// check if there is any debug out left to remove
44
[12761]45namespace ariba {
46namespace utility {
47
[12764]48typedef boost::mutex::scoped_lock scoped_lock;
49
50using boost::posix_time::microsec_clock;
51using boost::posix_time::time_duration;
52using boost::date_time::not_a_date_time;
53using boost::scoped_ptr;
[12761]54
55
56use_logging_cpp(SystemQueue);
57
[12762]58SystemQueue::SystemQueue() :
59 SysQ( new QueueThread() )
[12761]60{
[12762]61 logging_debug("Creating SystemQueue at: " << this);
[12761]62}
63
64SystemQueue::~SystemQueue()
65{
66}
67
68
69SystemQueue& SystemQueue::instance()
70{
71 static SystemQueue _inst;
72 return _inst;
73}
74
75
76void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
77{
[12765]78// assert ( SysQ->running ); // should we really enforce this?
79 if ( ! SysQ->running )
80 {
81 logging_debug("/// WARNING: The SystemQueue is NOT RUNNING!");
82 }
[12764]83
[12761]84 // copy
85 SystemEvent ev(event);
86
[12762]87 SysQ->insert(ev, delay);
[12761]88}
89
90// maps to function call internally to the Event-system
91void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
92{
93 // copy function object
94 boost::function0<void>* function_ptr = new boost::function0<void>();
95 (*function_ptr) = function;
96
97 // schedule special call-event
98 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
99}
100
101
102
103
104void SystemQueue::run()
105{
[12762]106 assert ( ! SysQ->running );
[12764]107 assert ( ! SysQ->unclean );
[12761]108
[12762]109 SysQ->running = true;
[12761]110
111 // start thread
[12762]112 sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
[12761]113}
114
115void SystemQueue::cancel()
116{
[12764]117 // CHECK: this function must not be called from within a SystemQueue-Event
118 if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
119 {
120 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
121 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
122 }
123
124
[12761]125 // signal SysQ to quit (and abort queued events)
[12762]126 SysQ->cancel();
[12761]127
128 // wait till actually completes
129 // (should be fast, but the current event is allowed to finish)
[12763]130 if ( sysq_thread )
131 {
132 logging_debug("/// ... joining SysQ thread");
133 sysq_thread->join();
134 }
[12761]135
136 // delete thread object
137 sysq_thread.reset();
138
[12762]139 assert ( ! SysQ->isRunning() );
140
[12761]141
[12762]142 // clean up and respawn
143 logging_debug("/// respawning SysQ");
144 SysQ.reset( new QueueThread() );
[12761]145}
146
[12765]147
148void SystemQueue::leave()
149{
150 // signal SysQ to quit (and abort queued events)
151 SysQ->cancel();
152}
153
154void SystemQueue::join()
155{
156 if ( sysq_thread )
157 {
158 logging_debug("/// ... joining SysQ thread");
159 sysq_thread->join();
160 }
161}
162
163
164
[12761]165void SystemQueue::dropAll( const SystemEventListener* mlistener)
166{
[12764]167// TODO
[12761]168// directScheduler.dropAll(mlistener);
169// delayScheduler.dropAll(mlistener);
170}
171
172bool SystemQueue::isEmpty()
173{
[12762]174 return SysQ->isEmpty();
[12761]175}
176
177bool SystemQueue::isRunning()
178{
[12762]179 return SysQ->isRunning();
[12761]180}
181
182
[12764]183//********************************************************
[12761]184
[12764]185
186/// constructor
[12761]187SystemQueue::QueueThread::QueueThread() :
[12762]188 processing_event( false ),
[12761]189 running( false ),
[12764]190 aborted( false ),
191 unclean( false )
[12761]192{
193}
194
195SystemQueue::QueueThread::~QueueThread(){
196}
197
198
199void SystemQueue::QueueThread::operator()()
200{
[12762]201 logging_debug( "/// SysQ thread is alive." );
[12761]202
203 assert( running ); // this is set before the thread starts
204
205 // main loop
206 while ( ! aborted )
207 {
208 // run next immediate event (only one)
209 run_immediate_event();
210
211 // maintain timed events (move to immediateEventsQ, when deadline expired)
212 check_timed_queue();
213
214 // wait for next deadline (if no immediate events pending)
215 wait_for_next_deadline();
216 }
217
[12762]218 logging_debug( "/// SysQ thread is quitting." );
[12761]219
[12764]220 unclean = true;
[12761]221 running = false;
222}
223
224
225
226
227
228/// main loop functions
229
230void SystemQueue::QueueThread::run_immediate_event()
231{
232 // get next event and process it
233 if ( ! immediateEventsQ.empty() )
234 {
235 scoped_ptr<SystemEvent> currently_processed_event;
236
[12763]237 /* dequeue event */
[12761]238 // SYNCHRONIZED
239 {
240 scoped_lock lock( queue_mutex );
241
[12762]242 this->processing_event = true;
243
[12763]244 // * dequeue first event *
[12761]245 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy
246 immediateEventsQ.pop_front();
247 }
248
[12763]249 /* dispatch event */
[12764]250// logging_debug("/// SysQ: dispatching event");
[12761]251
[12763]252 // measure execution time (1/2)
253 ptime start_time = get_clock();
254
255 // * dispatch event *
[12761]256 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
[12763]257
258 // measure execution time (2/2)
259 time_duration execution_time = get_clock() - start_time;
260
261 // DEBUG OUTPUT: warning when execution takes too much time
262 // [ TODO how long is "too much time"? ]
263 if ( execution_time.total_milliseconds() > 50 )
264 {
265 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
266 }
267
268 /* [ TODO ]
269 *
270 * - we could also measure how long an event has been waiting in the queue before it's dispatched
271 * (in order to detect overload)
272 *
273 * - especially for timed events, the displacement could be calculated
274 * (and, e.g., put in relation with the actual intended sleep time)
275 */
[12761]276 }
277
[12762]278 this->processing_event = false;
[12761]279}
280
281void SystemQueue::QueueThread::check_timed_queue()
282{
283 // this whole function is SYNCHRONIZED
284 scoped_lock lock( queue_mutex );
285
[12763]286 ptime now = get_clock();
[12761]287 bool not_expired_events_reached = false;
288
289 // move all expired events into the immediateEventsQ
290 while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
291 {
[12763]292 const SystemEvent& ev = timedEventsQ.top();
[12761]293
[12763]294 time_duration remaining_sleep_time = ev.deadline - now;
[12761]295
296 // BRANCH: deadline reached
297 if ( remaining_sleep_time.is_negative() )
298 {
299 // move to immediateEventsQ
300 immediateEventsQ.push_back(ev);
[12763]301 timedEventsQ.pop();
[12761]302 }
303 // BRANCH: deadline not reached
304 else
305 {
306 // okay, that's all for now.
307 not_expired_events_reached = true;
308 }
309 } // while
310}
311
312void SystemQueue::QueueThread::wait_for_next_deadline()
313{
[12763]314 // SYNCHRONIZED
315 boost::mutex::scoped_lock lock(queue_mutex);
316
[12761]317 if ( immediateEventsQ.empty() )
318 {
319 // don't sleep when the SystemQueue is not already canceled
320 if ( aborted )
321 return;
322
323
324 // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
[12763]325 if ( timedEventsQ.empty() )
[12761]326 {
[12765]327// logging_debug("/// SysQ is going to sleep.");
[12761]328
329 this->system_queue_idle.wait( lock );
330 }
331 // BRANCH: sleep till next timed event
332 else
333 {
[12765]334// logging_debug( "/// SysQ is going to sleep for "
335// << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
336// << "ms. Deadline: "
337// << timedEventsQ.top().deadline
338// << ", Clock: "
339// << get_clock() );
[12761]340
[12763]341 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
[12761]342 }
343 }
344}
345
346
347/// uniform clock interface
348ptime SystemQueue::QueueThread::get_clock()
349{
[12764]350 return microsec_clock::universal_time();
[12761]351}
352
353
354
355/// external interface
356
357bool SystemQueue::QueueThread::isRunning()
358{
359 return running;
360}
361
362
363void SystemQueue::QueueThread::cancel()
364{
[12762]365 logging_debug("/// Cancelling system queue... ");
[12761]366
367 // SYNCHRONIZED
368 {
369 scoped_lock lock(queue_mutex);
370 aborted = true;
371 }
372
[12762]373 logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
[12761]374 << timedEventsQ.size() << " timed event(s) left.");
375
376 system_queue_idle.notify_all();
377}
378
379
380void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay )
381{
382 event.scheduledTime = get_clock();
383
384 // SYNCHRONIZED
385 {
386 scoped_lock lock( queue_mutex );
387
388 // BRANCH: immediate event
389 if ( delay == 0 )
390 {
391 immediateEventsQ.push_back(event);
392 }
[12763]393 // BRANCH: timed event
394 else
395 {
396 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
[12764]397 event.delayTime = delay; // ( I think this is no longer needed.. )
[12763]398
[12764]399// // debug output
400// logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
[12763]401
402 timedEventsQ.push(event);
403 }
[12761]404 }
405
406 // wake SysQ thread
407 system_queue_idle.notify_one(); // NOTE: there is only one thread
408 // (so it doesn't matter whether to call notify_one, or notify_all)
409}
410
411
[12762]412bool SystemQueue::QueueThread::isEmpty()
413{
414 // SYNCHRONIZED
415 scoped_lock lock( queue_mutex );
416
417 return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
418}
[12761]419
[12762]420
421
[12761]422// FIXME
423void SystemQueue::enterMethod()
424{
425 assert( false );
426}
427void SystemQueue::leaveMethod()
428{
429 assert( false );
430}
431
432
433
[12764]434// XXX code from old system queue
435// void SystemQueue::QueueThread::enter(){
436// queueMutex.lock();
437// }
438//
439// void SystemQueue::QueueThread::leave(){
440// queueMutex.unlock();
441// }
[12761]442
443}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.