source: source/ariba/utility/system/SystemQueue.cpp@ 12764

Last change on this file since 12764 was 12764, checked in by hock@…, 10 years ago

New SystemQueue...

... is passing all unit tests, now. :-)

File size: 11.0 KB
RevLine 
[12761]1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <ariba/utility/misc/sha1.h>
40#include <stdexcept>
41
[12764]42// TODO Mario:
43// check if there is any debug out left to remove
44
[12761]45namespace ariba {
46namespace utility {
47
[12764]48typedef boost::mutex::scoped_lock scoped_lock;
49
50using boost::posix_time::microsec_clock;
51using boost::posix_time::time_duration;
52using boost::date_time::not_a_date_time;
53using boost::scoped_ptr;
[12761]54
55
56use_logging_cpp(SystemQueue);
57
[12762]58SystemQueue::SystemQueue() :
59 SysQ( new QueueThread() )
[12761]60{
[12762]61 logging_debug("Creating SystemQueue at: " << this);
[12761]62}
63
64SystemQueue::~SystemQueue()
65{
66}
67
68
69SystemQueue& SystemQueue::instance()
70{
71 static SystemQueue _inst;
72 return _inst;
73}
74
75
76void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
77{
[12764]78 assert ( SysQ->running ); // should we really enforce this?
79
[12761]80 // copy
81 SystemEvent ev(event);
82
[12762]83 SysQ->insert(ev, delay);
[12761]84}
85
86// maps to function call internally to the Event-system
87void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
88{
89 // copy function object
90 boost::function0<void>* function_ptr = new boost::function0<void>();
91 (*function_ptr) = function;
92
93 // schedule special call-event
94 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
95}
96
97
98
99
100void SystemQueue::run()
101{
[12762]102 assert ( ! SysQ->running );
[12764]103 assert ( ! SysQ->unclean );
[12761]104
[12762]105 SysQ->running = true;
[12761]106
107 // start thread
[12762]108 sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
[12761]109}
110
111void SystemQueue::cancel()
112{
[12764]113 // CHECK: this function must not be called from within a SystemQueue-Event
114 if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
115 {
116 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
117 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
118 }
119
120
[12761]121 // signal SysQ to quit (and abort queued events)
[12762]122 SysQ->cancel();
[12761]123
124 // wait till actually completes
125 // (should be fast, but the current event is allowed to finish)
[12763]126 if ( sysq_thread )
127 {
128 logging_debug("/// ... joining SysQ thread");
129 sysq_thread->join();
130 }
[12761]131
132 // delete thread object
133 sysq_thread.reset();
134
[12762]135 assert ( ! SysQ->isRunning() );
136
[12761]137
[12762]138 // clean up and respawn
139 logging_debug("/// respawning SysQ");
140 SysQ.reset( new QueueThread() );
[12761]141}
142
143void SystemQueue::dropAll( const SystemEventListener* mlistener)
144{
[12764]145// TODO
[12761]146// directScheduler.dropAll(mlistener);
147// delayScheduler.dropAll(mlistener);
148}
149
150bool SystemQueue::isEmpty()
151{
[12762]152 return SysQ->isEmpty();
[12761]153}
154
155bool SystemQueue::isRunning()
156{
[12762]157 return SysQ->isRunning();
[12761]158}
159
160
[12764]161//********************************************************
[12761]162
[12764]163
164/// constructor
[12761]165SystemQueue::QueueThread::QueueThread() :
[12762]166 processing_event( false ),
[12761]167 running( false ),
[12764]168 aborted( false ),
169 unclean( false )
[12761]170{
171}
172
173SystemQueue::QueueThread::~QueueThread(){
174}
175
176
177void SystemQueue::QueueThread::operator()()
178{
[12762]179 logging_debug( "/// SysQ thread is alive." );
[12761]180
181 assert( running ); // this is set before the thread starts
182
183 // main loop
184 while ( ! aborted )
185 {
186 // run next immediate event (only one)
187 run_immediate_event();
188
189 // maintain timed events (move to immediateEventsQ, when deadline expired)
190 check_timed_queue();
191
192 // wait for next deadline (if no immediate events pending)
193 wait_for_next_deadline();
194 }
195
[12762]196 logging_debug( "/// SysQ thread is quitting." );
[12761]197
[12764]198 unclean = true;
[12761]199 running = false;
200}
201
202
203
204
205
206/// main loop functions
207
208void SystemQueue::QueueThread::run_immediate_event()
209{
210 // get next event and process it
211 if ( ! immediateEventsQ.empty() )
212 {
213 scoped_ptr<SystemEvent> currently_processed_event;
214
[12763]215 /* dequeue event */
[12761]216 // SYNCHRONIZED
217 {
218 scoped_lock lock( queue_mutex );
219
[12762]220 this->processing_event = true;
221
[12763]222 // * dequeue first event *
[12761]223 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy
224 immediateEventsQ.pop_front();
225 }
226
[12763]227 /* dispatch event */
[12764]228// logging_debug("/// SysQ: dispatching event");
[12761]229
[12763]230 // measure execution time (1/2)
231 ptime start_time = get_clock();
232
233 // * dispatch event *
[12761]234 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
[12763]235
236 // measure execution time (2/2)
237 time_duration execution_time = get_clock() - start_time;
238
239 // DEBUG OUTPUT: warning when execution takes too much time
240 // [ TODO how long is "too much time"? ]
241 if ( execution_time.total_milliseconds() > 50 )
242 {
243 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
244 }
245
246 /* [ TODO ]
247 *
248 * - we could also measure how long an event has been waiting in the queue before it's dispatched
249 * (in order to detect overload)
250 *
251 * - especially for timed events, the displacement could be calculated
252 * (and, e.g., put in relation with the actual intended sleep time)
253 */
[12761]254 }
255
[12762]256 this->processing_event = false;
[12761]257}
258
259void SystemQueue::QueueThread::check_timed_queue()
260{
261 // this whole function is SYNCHRONIZED
262 scoped_lock lock( queue_mutex );
263
[12763]264 ptime now = get_clock();
[12761]265 bool not_expired_events_reached = false;
266
267 // move all expired events into the immediateEventsQ
268 while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
269 {
[12763]270 const SystemEvent& ev = timedEventsQ.top();
[12761]271
[12763]272 time_duration remaining_sleep_time = ev.deadline - now;
[12761]273
274 // BRANCH: deadline reached
275 if ( remaining_sleep_time.is_negative() )
276 {
277 // move to immediateEventsQ
278 immediateEventsQ.push_back(ev);
[12763]279 timedEventsQ.pop();
[12761]280 }
281 // BRANCH: deadline not reached
282 else
283 {
284 // okay, that's all for now.
285 not_expired_events_reached = true;
286 }
287 } // while
288}
289
290void SystemQueue::QueueThread::wait_for_next_deadline()
291{
[12763]292 // SYNCHRONIZED
293 boost::mutex::scoped_lock lock(queue_mutex);
294
[12761]295 if ( immediateEventsQ.empty() )
296 {
297 // don't sleep when the SystemQueue is not already canceled
298 if ( aborted )
299 return;
300
301
302 // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
[12763]303 if ( timedEventsQ.empty() )
[12761]304 {
[12762]305 logging_debug("/// SysQ is going to sleep.");
[12761]306
307 this->system_queue_idle.wait( lock );
308 }
309 // BRANCH: sleep till next timed event
310 else
311 {
[12763]312 logging_debug( "/// SysQ is going to sleep for "
313 << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
314 << "ms. Deadline: "
315 << timedEventsQ.top().deadline
316 << ", Clock: "
317 << get_clock() );
[12761]318
[12763]319 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
[12761]320 }
321 }
322}
323
324
325/// uniform clock interface
326ptime SystemQueue::QueueThread::get_clock()
327{
[12764]328 return microsec_clock::universal_time();
[12761]329}
330
331
332
333/// external interface
334
335bool SystemQueue::QueueThread::isRunning()
336{
337 return running;
338}
339
340
341void SystemQueue::QueueThread::cancel()
342{
[12762]343 logging_debug("/// Cancelling system queue... ");
[12761]344
345 // SYNCHRONIZED
346 {
347 scoped_lock lock(queue_mutex);
348 aborted = true;
349 }
350
[12762]351 logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
[12761]352 << timedEventsQ.size() << " timed event(s) left.");
353
354 system_queue_idle.notify_all();
355}
356
357
358void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay )
359{
360 event.scheduledTime = get_clock();
361
362 // SYNCHRONIZED
363 {
364 scoped_lock lock( queue_mutex );
365
366 // BRANCH: immediate event
367 if ( delay == 0 )
368 {
369 immediateEventsQ.push_back(event);
370 }
[12763]371 // BRANCH: timed event
372 else
373 {
374 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
[12764]375 event.delayTime = delay; // ( I think this is no longer needed.. )
[12763]376
[12764]377// // debug output
378// logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
[12763]379
380 timedEventsQ.push(event);
381 }
[12761]382 }
383
384 // wake SysQ thread
385 system_queue_idle.notify_one(); // NOTE: there is only one thread
386 // (so it doesn't matter whether to call notify_one, or notify_all)
387}
388
389
[12762]390bool SystemQueue::QueueThread::isEmpty()
391{
392 // SYNCHRONIZED
393 scoped_lock lock( queue_mutex );
394
395 return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
396}
[12761]397
[12762]398
399
[12761]400// FIXME
401void SystemQueue::enterMethod()
402{
403 assert( false );
404}
405void SystemQueue::leaveMethod()
406{
407 assert( false );
408}
409
410
411
[12764]412// XXX code from old system queue
413// void SystemQueue::QueueThread::enter(){
414// queueMutex.lock();
415// }
416//
417// void SystemQueue::QueueThread::leave(){
418// queueMutex.unlock();
419// }
[12761]420
421}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.