source: source/ariba/utility/system/SystemQueue.cpp@ 12770

Last change on this file since 12770 was 12767, checked in by hock@…, 11 years ago

typo

File size: 11.5 KB
RevLine 
[12761]1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <ariba/utility/misc/sha1.h>
40#include <stdexcept>
41
[12764]42// TODO Mario:
43// check if there is any debug out left to remove
44
[12761]45namespace ariba {
46namespace utility {
47
[12764]48typedef boost::mutex::scoped_lock scoped_lock;
49
50using boost::posix_time::microsec_clock;
51using boost::posix_time::time_duration;
52using boost::date_time::not_a_date_time;
53using boost::scoped_ptr;
[12761]54
55
56use_logging_cpp(SystemQueue);
57
[12762]58SystemQueue::SystemQueue() :
59 SysQ( new QueueThread() )
[12761]60{
[12762]61 logging_debug("Creating SystemQueue at: " << this);
[12761]62}
63
64SystemQueue::~SystemQueue()
65{
66}
67
68
69SystemQueue& SystemQueue::instance()
70{
71 static SystemQueue _inst;
72 return _inst;
73}
74
75
76void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
77{
[12765]78// assert ( SysQ->running ); // should we really enforce this?
79 if ( ! SysQ->running )
80 {
81 logging_debug("/// WARNING: The SystemQueue is NOT RUNNING!");
82 }
[12764]83
[12761]84 // copy
85 SystemEvent ev(event);
86
[12762]87 SysQ->insert(ev, delay);
[12761]88}
89
90// maps to function call internally to the Event-system
91void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
92{
93 // copy function object
94 boost::function0<void>* function_ptr = new boost::function0<void>();
95 (*function_ptr) = function;
96
97 // schedule special call-event
98 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
99}
100
101
102
103
104void SystemQueue::run()
105{
[12766]106 // TODO should these be assertion or exceptions..? (exceptions => unit test)
[12762]107 assert ( ! SysQ->running );
[12764]108 assert ( ! SysQ->unclean );
[12761]109
[12762]110 SysQ->running = true;
[12761]111
112 // start thread
[12762]113 sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
[12761]114}
115
116void SystemQueue::cancel()
117{
[12764]118 // CHECK: this function must not be called from within a SystemQueue-Event
119 if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
120 {
121 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
122 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
123 }
124
125
[12761]126 // signal SysQ to quit (and abort queued events)
[12762]127 SysQ->cancel();
[12761]128
129 // wait till actually completes
130 // (should be fast, but the current event is allowed to finish)
[12763]131 if ( sysq_thread )
132 {
133 logging_debug("/// ... joining SysQ thread");
134 sysq_thread->join();
135 }
[12761]136
137 // delete thread object
138 sysq_thread.reset();
139
[12762]140 assert ( ! SysQ->isRunning() );
141
[12761]142
[12762]143 // clean up and respawn
144 logging_debug("/// respawning SysQ");
145 SysQ.reset( new QueueThread() );
[12761]146}
147
[12765]148
149void SystemQueue::leave()
150{
151 // signal SysQ to quit (and abort queued events)
152 SysQ->cancel();
153}
154
155void SystemQueue::join()
156{
157 if ( sysq_thread )
158 {
159 logging_debug("/// ... joining SysQ thread");
160 sysq_thread->join();
161 }
162}
163
164
165
[12761]166void SystemQueue::dropAll( const SystemEventListener* mlistener)
167{
[12764]168// TODO
[12761]169// directScheduler.dropAll(mlistener);
170// delayScheduler.dropAll(mlistener);
171}
172
173bool SystemQueue::isEmpty()
174{
[12762]175 return SysQ->isEmpty();
[12761]176}
177
178bool SystemQueue::isRunning()
179{
[12762]180 return SysQ->isRunning();
[12761]181}
182
183
[12764]184//********************************************************
[12761]185
[12764]186
187/// constructor
[12761]188SystemQueue::QueueThread::QueueThread() :
[12762]189 processing_event( false ),
[12761]190 running( false ),
[12764]191 aborted( false ),
192 unclean( false )
[12761]193{
194}
195
196SystemQueue::QueueThread::~QueueThread(){
197}
198
199
200void SystemQueue::QueueThread::operator()()
201{
[12762]202 logging_debug( "/// SysQ thread is alive." );
[12761]203
204 assert( running ); // this is set before the thread starts
205
206 // main loop
207 while ( ! aborted )
208 {
209 // run next immediate event (only one)
210 run_immediate_event();
211
212 // maintain timed events (move to immediateEventsQ, when deadline expired)
213 check_timed_queue();
214
215 // wait for next deadline (if no immediate events pending)
216 wait_for_next_deadline();
217 }
218
[12762]219 logging_debug( "/// SysQ thread is quitting." );
[12761]220
[12764]221 unclean = true;
[12761]222 running = false;
223}
224
225
226
227
228
229/// main loop functions
230
231void SystemQueue::QueueThread::run_immediate_event()
232{
233 // get next event and process it
234 if ( ! immediateEventsQ.empty() )
235 {
236 scoped_ptr<SystemEvent> currently_processed_event;
237
[12763]238 /* dequeue event */
[12761]239 // SYNCHRONIZED
240 {
241 scoped_lock lock( queue_mutex );
242
[12762]243 this->processing_event = true;
244
[12763]245 // * dequeue first event *
[12761]246 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy
247 immediateEventsQ.pop_front();
248 }
249
[12763]250 /* dispatch event */
[12764]251// logging_debug("/// SysQ: dispatching event");
[12761]252
[12763]253 // measure execution time (1/2)
254 ptime start_time = get_clock();
255
256 // * dispatch event *
[12761]257 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
[12763]258
259 // measure execution time (2/2)
260 time_duration execution_time = get_clock() - start_time;
261
262 // DEBUG OUTPUT: warning when execution takes too much time
[12766]263 // [ TODOx how long is "too much time"? ]
[12763]264 if ( execution_time.total_milliseconds() > 50 )
265 {
[12767]266 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " ms to complete.");
[12763]267 }
268
[12766]269 /* [ TODOx ]
[12763]270 *
271 * - we could also measure how long an event has been waiting in the queue before it's dispatched
272 * (in order to detect overload)
273 *
274 * - especially for timed events, the displacement could be calculated
275 * (and, e.g., put in relation with the actual intended sleep time)
276 */
[12761]277 }
278
[12762]279 this->processing_event = false;
[12761]280}
281
282void SystemQueue::QueueThread::check_timed_queue()
283{
284 // this whole function is SYNCHRONIZED
285 scoped_lock lock( queue_mutex );
286
[12763]287 ptime now = get_clock();
[12761]288 bool not_expired_events_reached = false;
289
290 // move all expired events into the immediateEventsQ
291 while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
292 {
[12763]293 const SystemEvent& ev = timedEventsQ.top();
[12761]294
[12763]295 time_duration remaining_sleep_time = ev.deadline - now;
[12761]296
297 // BRANCH: deadline reached
298 if ( remaining_sleep_time.is_negative() )
299 {
300 // move to immediateEventsQ
301 immediateEventsQ.push_back(ev);
[12763]302 timedEventsQ.pop();
[12761]303 }
304 // BRANCH: deadline not reached
305 else
306 {
307 // okay, that's all for now.
308 not_expired_events_reached = true;
309 }
310 } // while
311}
312
313void SystemQueue::QueueThread::wait_for_next_deadline()
314{
[12763]315 // SYNCHRONIZED
316 boost::mutex::scoped_lock lock(queue_mutex);
317
[12761]318 if ( immediateEventsQ.empty() )
319 {
320 // don't sleep when the SystemQueue is not already canceled
321 if ( aborted )
322 return;
323
324
325 // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
[12763]326 if ( timedEventsQ.empty() )
[12761]327 {
[12765]328// logging_debug("/// SysQ is going to sleep.");
[12761]329
330 this->system_queue_idle.wait( lock );
331 }
332 // BRANCH: sleep till next timed event
333 else
334 {
[12765]335// logging_debug( "/// SysQ is going to sleep for "
336// << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
337// << "ms. Deadline: "
338// << timedEventsQ.top().deadline
339// << ", Clock: "
340// << get_clock() );
[12761]341
[12763]342 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
[12761]343 }
344 }
345}
346
347
348/// uniform clock interface
349ptime SystemQueue::QueueThread::get_clock()
350{
[12764]351 return microsec_clock::universal_time();
[12761]352}
353
354
355
356/// external interface
357
358bool SystemQueue::QueueThread::isRunning()
359{
360 return running;
361}
362
363
364void SystemQueue::QueueThread::cancel()
365{
[12762]366 logging_debug("/// Cancelling system queue... ");
[12761]367
368 // SYNCHRONIZED
369 {
370 scoped_lock lock(queue_mutex);
371 aborted = true;
372 }
373
[12762]374 logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
[12761]375 << timedEventsQ.size() << " timed event(s) left.");
376
377 system_queue_idle.notify_all();
378}
379
380
381void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay )
382{
383 event.scheduledTime = get_clock();
384
385 // SYNCHRONIZED
386 {
387 scoped_lock lock( queue_mutex );
388
389 // BRANCH: immediate event
390 if ( delay == 0 )
391 {
392 immediateEventsQ.push_back(event);
393 }
[12763]394 // BRANCH: timed event
395 else
396 {
397 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
[12764]398 event.delayTime = delay; // ( I think this is no longer needed.. )
[12763]399
[12764]400// // debug output
401// logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
[12763]402
403 timedEventsQ.push(event);
404 }
[12761]405 }
406
407 // wake SysQ thread
408 system_queue_idle.notify_one(); // NOTE: there is only one thread
409 // (so it doesn't matter whether to call notify_one, or notify_all)
410}
411
412
[12762]413bool SystemQueue::QueueThread::isEmpty()
414{
415 // SYNCHRONIZED
416 scoped_lock lock( queue_mutex );
417
418 return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
419}
[12761]420
[12762]421
422
[12761]423// FIXME
424void SystemQueue::enterMethod()
425{
426 assert( false );
427}
428void SystemQueue::leaveMethod()
429{
430 assert( false );
431}
432
433
434
[12764]435// XXX code from old system queue
436// void SystemQueue::QueueThread::enter(){
437// queueMutex.lock();
438// }
439//
440// void SystemQueue::QueueThread::leave(){
441// queueMutex.unlock();
442// }
[12761]443
444}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.