source: source/ariba/utility/system/SystemQueue.cpp@ 12771

Last change on this file since 12771 was 12771, checked in by hock@…, 10 years ago

added new interface: SystemQueue::am_I_in_the_SysQ_thread

for debugging and asserts..

File size: 11.6 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <ariba/utility/misc/sha1.h>
40#include <stdexcept>
41
42// TODO Mario:
43// check if there is any debug out left to remove
44
45namespace ariba {
46namespace utility {
47
48typedef boost::mutex::scoped_lock scoped_lock;
49
50using boost::posix_time::microsec_clock;
51using boost::posix_time::time_duration;
52using boost::date_time::not_a_date_time;
53using boost::scoped_ptr;
54
55
56use_logging_cpp(SystemQueue);
57
58SystemQueue::SystemQueue() :
59 SysQ( new QueueThread() )
60{
61 logging_debug("Creating SystemQueue at: " << this);
62}
63
64SystemQueue::~SystemQueue()
65{
66}
67
68
69SystemQueue& SystemQueue::instance()
70{
71 static SystemQueue _inst;
72 return _inst;
73}
74
75
76void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
77{
78// assert ( SysQ->running ); // should we really enforce this?
79 if ( ! SysQ->running )
80 {
81 logging_debug("/// WARNING: The SystemQueue is NOT RUNNING!");
82 }
83
84 // copy
85 SystemEvent ev(event);
86
87 SysQ->insert(ev, delay);
88}
89
90// maps to function call internally to the Event-system
91void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
92{
93 // copy function object
94 boost::function0<void>* function_ptr = new boost::function0<void>();
95 (*function_ptr) = function;
96
97 // schedule special call-event
98 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
99}
100
101
102
103
104void SystemQueue::run()
105{
106 // TODO should these be assertion or exceptions..? (exceptions => unit test)
107 assert ( ! SysQ->running );
108 assert ( ! SysQ->unclean );
109
110 SysQ->running = true;
111
112 // start thread
113 sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
114}
115
116void SystemQueue::cancel()
117{
118 // CHECK: this function must not be called from within a SystemQueue-Event
119 if ( am_I_in_the_SysQ_thread() )
120// if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
121 {
122 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
123 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
124 }
125
126
127 // signal SysQ to quit (and abort queued events)
128 SysQ->cancel();
129
130 // wait till actually completes
131 // (should be fast, but the current event is allowed to finish)
132 if ( sysq_thread )
133 {
134 logging_debug("/// ... joining SysQ thread");
135 sysq_thread->join();
136 }
137
138 // delete thread object
139 sysq_thread.reset();
140
141 assert ( ! SysQ->isRunning() );
142
143
144 // clean up and respawn
145 logging_debug("/// respawning SysQ");
146 SysQ.reset( new QueueThread() );
147}
148
149
150void SystemQueue::leave()
151{
152 // signal SysQ to quit (and abort queued events)
153 SysQ->cancel();
154}
155
156void SystemQueue::join()
157{
158 if ( sysq_thread )
159 {
160 logging_debug("/// ... joining SysQ thread");
161 sysq_thread->join();
162 }
163}
164
165
166
167void SystemQueue::dropAll( const SystemEventListener* mlistener)
168{
169// TODO
170// directScheduler.dropAll(mlistener);
171// delayScheduler.dropAll(mlistener);
172}
173
174bool SystemQueue::isEmpty()
175{
176 return SysQ->isEmpty();
177}
178
179bool SystemQueue::isRunning()
180{
181 return SysQ->isRunning();
182}
183
184bool SystemQueue::am_I_in_the_SysQ_thread()
185{
186 return sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id();
187}
188
189
190//********************************************************
191
192
193/// constructor
194SystemQueue::QueueThread::QueueThread() :
195 processing_event( false ),
196 running( false ),
197 aborted( false ),
198 unclean( false )
199{
200}
201
202SystemQueue::QueueThread::~QueueThread(){
203}
204
205
206void SystemQueue::QueueThread::operator()()
207{
208 logging_debug( "/// SysQ thread is alive." );
209
210 assert( running ); // this is set before the thread starts
211
212 // main loop
213 while ( ! aborted )
214 {
215 // run next immediate event (only one)
216 run_immediate_event();
217
218 // maintain timed events (move to immediateEventsQ, when deadline expired)
219 check_timed_queue();
220
221 // wait for next deadline (if no immediate events pending)
222 wait_for_next_deadline();
223 }
224
225 logging_debug( "/// SysQ thread is quitting." );
226
227 unclean = true;
228 running = false;
229}
230
231
232
233
234
235/// main loop functions
236
237void SystemQueue::QueueThread::run_immediate_event()
238{
239 // get next event and process it
240 if ( ! immediateEventsQ.empty() )
241 {
242 scoped_ptr<SystemEvent> currently_processed_event;
243
244 /* dequeue event */
245 // SYNCHRONIZED
246 {
247 scoped_lock lock( queue_mutex );
248
249 this->processing_event = true;
250
251 // * dequeue first event *
252 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy
253 immediateEventsQ.pop_front();
254 }
255
256 /* dispatch event */
257// logging_debug("/// SysQ: dispatching event");
258
259 // measure execution time (1/2)
260 ptime start_time = get_clock();
261
262 // * dispatch event *
263 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
264
265 // measure execution time (2/2)
266 time_duration execution_time = get_clock() - start_time;
267
268 // DEBUG OUTPUT: warning when execution takes too much time
269 // [ TODOx how long is "too much time"? ]
270 if ( execution_time.total_milliseconds() > 50 )
271 {
272 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " ms to complete.");
273 }
274
275 /* [ TODOx ]
276 *
277 * - we could also measure how long an event has been waiting in the queue before it's dispatched
278 * (in order to detect overload)
279 *
280 * - especially for timed events, the displacement could be calculated
281 * (and, e.g., put in relation with the actual intended sleep time)
282 */
283 }
284
285 this->processing_event = false;
286}
287
288void SystemQueue::QueueThread::check_timed_queue()
289{
290 // this whole function is SYNCHRONIZED
291 scoped_lock lock( queue_mutex );
292
293 ptime now = get_clock();
294 bool not_expired_events_reached = false;
295
296 // move all expired events into the immediateEventsQ
297 while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
298 {
299 const SystemEvent& ev = timedEventsQ.top();
300
301 time_duration remaining_sleep_time = ev.deadline - now;
302
303 // BRANCH: deadline reached
304 if ( remaining_sleep_time.is_negative() )
305 {
306 // move to immediateEventsQ
307 immediateEventsQ.push_back(ev);
308 timedEventsQ.pop();
309 }
310 // BRANCH: deadline not reached
311 else
312 {
313 // okay, that's all for now.
314 not_expired_events_reached = true;
315 }
316 } // while
317}
318
319void SystemQueue::QueueThread::wait_for_next_deadline()
320{
321 // SYNCHRONIZED
322 boost::mutex::scoped_lock lock(queue_mutex);
323
324 if ( immediateEventsQ.empty() )
325 {
326 // don't sleep when the SystemQueue is not already canceled
327 if ( aborted )
328 return;
329
330
331 // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
332 if ( timedEventsQ.empty() )
333 {
334// logging_debug("/// SysQ is going to sleep.");
335
336 this->system_queue_idle.wait( lock );
337 }
338 // BRANCH: sleep till next timed event
339 else
340 {
341// logging_debug( "/// SysQ is going to sleep for "
342// << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
343// << "ms. Deadline: "
344// << timedEventsQ.top().deadline
345// << ", Clock: "
346// << get_clock() );
347
348 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
349 }
350 }
351}
352
353
354/// uniform clock interface
355ptime SystemQueue::QueueThread::get_clock()
356{
357 return microsec_clock::universal_time();
358}
359
360
361
362/// external interface
363
364bool SystemQueue::QueueThread::isRunning()
365{
366 return running;
367}
368
369
370void SystemQueue::QueueThread::cancel()
371{
372 logging_debug("/// Cancelling system queue... ");
373
374 // SYNCHRONIZED
375 {
376 scoped_lock lock(queue_mutex);
377 aborted = true;
378 }
379
380 logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
381 << timedEventsQ.size() << " timed event(s) left.");
382
383 system_queue_idle.notify_all();
384}
385
386
387void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay )
388{
389 event.scheduledTime = get_clock();
390
391 // SYNCHRONIZED
392 {
393 scoped_lock lock( queue_mutex );
394
395 // BRANCH: immediate event
396 if ( delay == 0 )
397 {
398 immediateEventsQ.push_back(event);
399 }
400 // BRANCH: timed event
401 else
402 {
403 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
404 event.delayTime = delay; // ( I think this is no longer needed.. )
405
406// // debug output
407// logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
408
409 timedEventsQ.push(event);
410 }
411 }
412
413 // wake SysQ thread
414 system_queue_idle.notify_one(); // NOTE: there is only one thread
415 // (so it doesn't matter whether to call notify_one, or notify_all)
416}
417
418
419bool SystemQueue::QueueThread::isEmpty()
420{
421 // SYNCHRONIZED
422 scoped_lock lock( queue_mutex );
423
424 return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
425}
426
427
428
429// FIXME
430void SystemQueue::enterMethod()
431{
432 assert( false );
433}
434void SystemQueue::leaveMethod()
435{
436 assert( false );
437}
438
439
440
441// XXX code from old system queue
442// void SystemQueue::QueueThread::enter(){
443// queueMutex.lock();
444// }
445//
446// void SystemQueue::QueueThread::leave(){
447// queueMutex.unlock();
448// }
449
450}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.