source: source/ariba/utility/system/SystemQueue.cpp@ 12766

Last change on this file since 12766 was 12766, checked in by hock@…, 11 years ago

..

File size: 11.5 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <ariba/utility/misc/sha1.h>
40#include <stdexcept>
41
42// TODO Mario:
43// check if there is any debug out left to remove
44
45namespace ariba {
46namespace utility {
47
48typedef boost::mutex::scoped_lock scoped_lock;
49
50using boost::posix_time::microsec_clock;
51using boost::posix_time::time_duration;
52using boost::date_time::not_a_date_time;
53using boost::scoped_ptr;
54
55
56use_logging_cpp(SystemQueue);
57
58SystemQueue::SystemQueue() :
59 SysQ( new QueueThread() )
60{
61 logging_debug("Creating SystemQueue at: " << this);
62}
63
64SystemQueue::~SystemQueue()
65{
66}
67
68
69SystemQueue& SystemQueue::instance()
70{
71 static SystemQueue _inst;
72 return _inst;
73}
74
75
76void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
77{
78// assert ( SysQ->running ); // should we really enforce this?
79 if ( ! SysQ->running )
80 {
81 logging_debug("/// WARNING: The SystemQueue is NOT RUNNING!");
82 }
83
84 // copy
85 SystemEvent ev(event);
86
87 SysQ->insert(ev, delay);
88}
89
90// maps to function call internally to the Event-system
91void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
92{
93 // copy function object
94 boost::function0<void>* function_ptr = new boost::function0<void>();
95 (*function_ptr) = function;
96
97 // schedule special call-event
98 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
99}
100
101
102
103
104void SystemQueue::run()
105{
106 // TODO should these be assertion or exceptions..? (exceptions => unit test)
107 assert ( ! SysQ->running );
108 assert ( ! SysQ->unclean );
109
110 SysQ->running = true;
111
112 // start thread
113 sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
114}
115
116void SystemQueue::cancel()
117{
118 // CHECK: this function must not be called from within a SystemQueue-Event
119 if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
120 {
121 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
122 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
123 }
124
125
126 // signal SysQ to quit (and abort queued events)
127 SysQ->cancel();
128
129 // wait till actually completes
130 // (should be fast, but the current event is allowed to finish)
131 if ( sysq_thread )
132 {
133 logging_debug("/// ... joining SysQ thread");
134 sysq_thread->join();
135 }
136
137 // delete thread object
138 sysq_thread.reset();
139
140 assert ( ! SysQ->isRunning() );
141
142
143 // clean up and respawn
144 logging_debug("/// respawning SysQ");
145 SysQ.reset( new QueueThread() );
146}
147
148
149void SystemQueue::leave()
150{
151 // signal SysQ to quit (and abort queued events)
152 SysQ->cancel();
153}
154
155void SystemQueue::join()
156{
157 if ( sysq_thread )
158 {
159 logging_debug("/// ... joining SysQ thread");
160 sysq_thread->join();
161 }
162}
163
164
165
166void SystemQueue::dropAll( const SystemEventListener* mlistener)
167{
168// TODO
169// directScheduler.dropAll(mlistener);
170// delayScheduler.dropAll(mlistener);
171}
172
173bool SystemQueue::isEmpty()
174{
175 return SysQ->isEmpty();
176}
177
178bool SystemQueue::isRunning()
179{
180 return SysQ->isRunning();
181}
182
183
184//********************************************************
185
186
187/// constructor
188SystemQueue::QueueThread::QueueThread() :
189 processing_event( false ),
190 running( false ),
191 aborted( false ),
192 unclean( false )
193{
194}
195
196SystemQueue::QueueThread::~QueueThread(){
197}
198
199
200void SystemQueue::QueueThread::operator()()
201{
202 logging_debug( "/// SysQ thread is alive." );
203
204 assert( running ); // this is set before the thread starts
205
206 // main loop
207 while ( ! aborted )
208 {
209 // run next immediate event (only one)
210 run_immediate_event();
211
212 // maintain timed events (move to immediateEventsQ, when deadline expired)
213 check_timed_queue();
214
215 // wait for next deadline (if no immediate events pending)
216 wait_for_next_deadline();
217 }
218
219 logging_debug( "/// SysQ thread is quitting." );
220
221 unclean = true;
222 running = false;
223}
224
225
226
227
228
229/// main loop functions
230
231void SystemQueue::QueueThread::run_immediate_event()
232{
233 // get next event and process it
234 if ( ! immediateEventsQ.empty() )
235 {
236 scoped_ptr<SystemEvent> currently_processed_event;
237
238 /* dequeue event */
239 // SYNCHRONIZED
240 {
241 scoped_lock lock( queue_mutex );
242
243 this->processing_event = true;
244
245 // * dequeue first event *
246 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy
247 immediateEventsQ.pop_front();
248 }
249
250 /* dispatch event */
251// logging_debug("/// SysQ: dispatching event");
252
253 // measure execution time (1/2)
254 ptime start_time = get_clock();
255
256 // * dispatch event *
257 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
258
259 // measure execution time (2/2)
260 time_duration execution_time = get_clock() - start_time;
261
262 // DEBUG OUTPUT: warning when execution takes too much time
263 // [ TODOx how long is "too much time"? ]
264 if ( execution_time.total_milliseconds() > 50 )
265 {
266 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
267 }
268
269 /* [ TODOx ]
270 *
271 * - we could also measure how long an event has been waiting in the queue before it's dispatched
272 * (in order to detect overload)
273 *
274 * - especially for timed events, the displacement could be calculated
275 * (and, e.g., put in relation with the actual intended sleep time)
276 */
277 }
278
279 this->processing_event = false;
280}
281
282void SystemQueue::QueueThread::check_timed_queue()
283{
284 // this whole function is SYNCHRONIZED
285 scoped_lock lock( queue_mutex );
286
287 ptime now = get_clock();
288 bool not_expired_events_reached = false;
289
290 // move all expired events into the immediateEventsQ
291 while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
292 {
293 const SystemEvent& ev = timedEventsQ.top();
294
295 time_duration remaining_sleep_time = ev.deadline - now;
296
297 // BRANCH: deadline reached
298 if ( remaining_sleep_time.is_negative() )
299 {
300 // move to immediateEventsQ
301 immediateEventsQ.push_back(ev);
302 timedEventsQ.pop();
303 }
304 // BRANCH: deadline not reached
305 else
306 {
307 // okay, that's all for now.
308 not_expired_events_reached = true;
309 }
310 } // while
311}
312
313void SystemQueue::QueueThread::wait_for_next_deadline()
314{
315 // SYNCHRONIZED
316 boost::mutex::scoped_lock lock(queue_mutex);
317
318 if ( immediateEventsQ.empty() )
319 {
320 // don't sleep when the SystemQueue is not already canceled
321 if ( aborted )
322 return;
323
324
325 // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
326 if ( timedEventsQ.empty() )
327 {
328// logging_debug("/// SysQ is going to sleep.");
329
330 this->system_queue_idle.wait( lock );
331 }
332 // BRANCH: sleep till next timed event
333 else
334 {
335// logging_debug( "/// SysQ is going to sleep for "
336// << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
337// << "ms. Deadline: "
338// << timedEventsQ.top().deadline
339// << ", Clock: "
340// << get_clock() );
341
342 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
343 }
344 }
345}
346
347
348/// uniform clock interface
349ptime SystemQueue::QueueThread::get_clock()
350{
351 return microsec_clock::universal_time();
352}
353
354
355
356/// external interface
357
358bool SystemQueue::QueueThread::isRunning()
359{
360 return running;
361}
362
363
364void SystemQueue::QueueThread::cancel()
365{
366 logging_debug("/// Cancelling system queue... ");
367
368 // SYNCHRONIZED
369 {
370 scoped_lock lock(queue_mutex);
371 aborted = true;
372 }
373
374 logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
375 << timedEventsQ.size() << " timed event(s) left.");
376
377 system_queue_idle.notify_all();
378}
379
380
381void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay )
382{
383 event.scheduledTime = get_clock();
384
385 // SYNCHRONIZED
386 {
387 scoped_lock lock( queue_mutex );
388
389 // BRANCH: immediate event
390 if ( delay == 0 )
391 {
392 immediateEventsQ.push_back(event);
393 }
394 // BRANCH: timed event
395 else
396 {
397 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
398 event.delayTime = delay; // ( I think this is no longer needed.. )
399
400// // debug output
401// logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
402
403 timedEventsQ.push(event);
404 }
405 }
406
407 // wake SysQ thread
408 system_queue_idle.notify_one(); // NOTE: there is only one thread
409 // (so it doesn't matter whether to call notify_one, or notify_all)
410}
411
412
413bool SystemQueue::QueueThread::isEmpty()
414{
415 // SYNCHRONIZED
416 scoped_lock lock( queue_mutex );
417
418 return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
419}
420
421
422
423// FIXME
424void SystemQueue::enterMethod()
425{
426 assert( false );
427}
428void SystemQueue::leaveMethod()
429{
430 assert( false );
431}
432
433
434
435// XXX code from old system queue
436// void SystemQueue::QueueThread::enter(){
437// queueMutex.lock();
438// }
439//
440// void SystemQueue::QueueThread::leave(){
441// queueMutex.unlock();
442// }
443
444}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.