source: source/ariba/utility/system/SystemQueue.cpp@ 12745

Last change on this file since 12745 was 12745, checked in by hock@…, 10 years ago

Debug output for faulty SystemQueue

File size: 12.9 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39#include <stdexcept>
40
41namespace ariba {
42namespace utility {
43
44use_logging_cpp(SystemQueue);
45
46// /// Singleton
47// SystemQueue::_inst(NULL);
48
49
50SystemQueue::SystemQueue()
51#ifndef UNDERLAY_OMNET
52 : delayScheduler( &directScheduler ),
53 systemQueueRunning( false )
54#endif
55{
56 logging_info("Creating SystemQueue at: " << this);
57}
58
59SystemQueue::~SystemQueue()
60{
61}
62
63
64SystemQueue& SystemQueue::instance()
65{
66 static SystemQueue _inst;
67 return _inst;
68
69// /// create singleton instance
70// if ( _inst == NULL )
71// {
72// _inst = new SystemQueue();
73//
74// logging_info("Creating SystemQueue at: " << _inst);
75// }
76//
77// return *_inst;
78}
79
80
81void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) {
82#ifndef UNDERLAY_OMNET
83 if( delay == 0 ) directScheduler.insert( event, delay );
84 else delayScheduler.insert ( event, delay );
85#else
86 Enter_Method_Silent();
87 cMessage* msg = new cMessage();
88 msg->setContextPointer( new SystemEvent(event) );
89
90 if( delay == 0 )
91 cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg );
92 else
93 cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg );
94#endif
95}
96
97// maps to function call internally to the Event-system
98void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
99{
100 // copy function object
101 boost::function0<void>* function_ptr = new boost::function0<void>();
102 (*function_ptr) = function;
103
104 // schedule special call-event
105 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
106
107}
108
109#ifdef UNDERLAY_OMNET
110void SystemQueue::handleMessage(cMessage* msg){
111 SystemEvent* event = (SystemEvent*)msg->contextPointer();
112
113 event->listener->handleSystemEvent( *event );
114
115 delete event; delete msg;
116}
117#endif
118
119void SystemQueue::run() {
120 systemQueueRunning = true;
121 directScheduler.run();
122 delayScheduler.run();
123}
124
125void SystemQueue::cancel() {
126#ifndef UNDERLAY_OMNET
127 systemQueueRunning = false;
128 directScheduler.cancel();
129 delayScheduler.cancel();
130#endif
131}
132
133void SystemQueue::dropAll( const SystemEventListener* mlistener){
134#ifndef UNDERLAY_OMNET
135 directScheduler.dropAll(mlistener);
136 delayScheduler.dropAll(mlistener);
137#endif
138}
139
140bool SystemQueue::isEmpty() {
141#ifndef UNDERLAY_OMNET
142 return directScheduler.isEmpty() || delayScheduler.isEmpty();
143#else
144 return false;
145#endif
146}
147
148bool SystemQueue::isRunning() {
149#ifndef UNDERLAY_OMNET
150 return systemQueueRunning;
151#else
152 return true;
153#endif
154}
155
156void SystemQueue::enterMethod(){
157 // TODO: omnet case and delay scheduler
158 directScheduler.enter();
159}
160
161void SystemQueue::leaveMethod(){
162 // TODO: omnet case and delay scheduler
163 directScheduler.leave();
164}
165
166//***************************************************************
167#ifndef UNDERLAY_OMNET
168
169SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue)
170 : transferQueue( _transferQueue ), running( false ) {
171}
172
173SystemQueue::QueueThread::~QueueThread(){
174}
175
176void SystemQueue::QueueThread::run(){
177 running = true;
178
179 queueThread = new boost::thread(
180 boost::bind(&QueueThread::threadFunc, this) );
181}
182
183void SystemQueue::QueueThread::cancel(){
184
185 logging_debug("cancelling system queue");
186
187 // cause the thread to exit
188 {
189 // get the lock, when we got the lock the
190 // queue thread must be in itemsAvailable.wait()
191 boost::mutex::scoped_lock lock(queueMutex);
192
193 // set the running indicator and signal to run on
194 // this will run the thread and quit it
195 running = false;
196 itemsAvailable.notify_all();
197 }
198
199 // wait until the thread has exited
200 logging_debug("joining system queue thread");
201 queueThread->join();
202
203 // delete pending events
204 logging_debug("deleting pending system queue events");
205 while( eventsQueue.size() > 0 ){
206 eventsQueue.erase( eventsQueue.begin() );
207 }
208
209 // delete the thread, so that a subsuquent run() can be called
210 delete queueThread;
211 queueThread = NULL;
212}
213
214bool SystemQueue::QueueThread::isEmpty(){
215 boost::mutex::scoped_lock lock( queueMutex );
216 return eventsQueue.empty();
217}
218
219void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
220
221 // if this is called from a module that is currently handling
222 // a thread (called from SystemQueue::onNextQueueItem), the
223 // thread is the same anyway and the mutex will be already
224 // aquired, otherwise we aquire it now
225
226 boost::mutex::scoped_lock lock( queueMutex );
227
228 if ( delay > 0 )
229 {
230 logging_debug("SystemQueue(" << this << ") : Schedule event in: " << delay << " ms; Events in queue (before insert): " << eventsQueue.size() );
231 }
232
233 eventsQueue.push_back( event );
234 eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
235 eventsQueue.back().delayTime = delay;
236 eventsQueue.back().remainingDelay = delay;
237
238 if ( delay > 0 )
239 {
240 logging_debug("SystemQueue(" << this << ") : Events in queue (after insert): " << eventsQueue.size() );
241 }
242
243 onItemInserted( event );
244 itemsAvailable.notify_all();
245}
246
247void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) {
248 boost::mutex::scoped_lock lock( queueMutex );
249
250 bool deleted;
251 do{
252 deleted = false;
253 EventQueue::iterator i = eventsQueue.begin();
254 EventQueue::iterator iend = eventsQueue.end();
255
256 for( ; i != iend; i++){
257 if((*i).getListener() == mlistener){
258 eventsQueue.erase(i);
259 deleted = true;
260 break;
261 }
262 }
263 }while(deleted);
264}
265
266void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
267
268 boost::mutex::scoped_lock lock( obj->queueMutex );
269
270 while( obj->running ) {
271
272 // wait until an item is in the queue or we are notified
273 // to quit the thread. in case the thread is about to
274 // quit, the queueThreadRunning variable will indicate
275 // this and cause the thread to exit
276
277 while ( obj->running && obj->eventsQueue.empty() ){
278
279// const boost::system_time duration =
280// boost::get_system_time() +
281// boost::posix_time::milliseconds(100);
282// obj->itemsAvailable.timed_wait( lock, duration );
283
284 obj->itemsAvailable.wait( lock );
285 }
286
287 //
288 // work all the items that are currently in the queue
289 //
290
291 while( obj->running && (!obj->eventsQueue.empty()) ) {
292
293 // fetch the first item in the queue
294 // and deliver it to the queue handler
295 SystemEvent ev = obj->eventsQueue.front();
296
297 // XXX debugging the delay-scheduler..
298 if ( ev.delayTime > 0 )
299 logging_debug("SystemQueue(" << obj << ") : Events in queue (before execution): " << obj->eventsQueue.size());
300
301 obj->eventsQueue.erase( obj->eventsQueue.begin() );
302
303 // call the queue and this will
304 // call the actual event handler
305 obj->queueMutex.unlock();
306 obj->onNextQueueItem( ev );
307 obj->queueMutex.lock();
308
309 // XXX debugging the delay-scheduler..
310 if ( ev.delayTime > 0 )
311 logging_debug("SystemQueue(" << obj << ") : Remaining events in queue (after execution): " << obj->eventsQueue.size());
312
313 } // !obj->eventsQueue.empty() )
314 } // while (obj->running)
315
316 logging_debug("system queue exited");
317}
318
319void SystemQueue::QueueThread::enter(){
320 queueMutex.lock();
321}
322
323void SystemQueue::QueueThread::leave(){
324 queueMutex.unlock();
325}
326
327
328//***************************************************************
329
330SystemQueue::QueueThreadDirect::QueueThreadDirect(){
331}
332
333SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
334}
335
336void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
337 // do nothing here
338}
339
340void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
341 // directly deliver the item to the
342 event.getListener()->handleSystemEvent( event );
343}
344
345//***************************************************************
346
347SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
348 : QueueThread( _transferQueue ), isSleeping( false ) {
349
350 assert( _transferQueue != NULL );
351}
352
353SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
354}
355
356void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
357
358 if( !isSleeping)
359 {
360 logging_warn("SystemQueue(" << this << ") : No, I'm not asleep!! New item inserted.");
361 return; // TODO Mario: shouldn't we sort anyway..?
362 }
363
364 // break an existing sleep and
365 // remember the time that was actually slept for
366 // and change it for every event in the queue
367
368 assert( !eventsQueue.empty());
369 sleepCond.notify_all();
370
371 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
372 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
373 uint32_t sleptTime = duration.total_milliseconds();
374
375 EventQueue::iterator i = eventsQueue.begin();
376 EventQueue::iterator iend = eventsQueue.end();
377
378 logging_debug("SystemQueue(" << this << ") : Adjusting remaining delays:");
379
380 // TODO Mario: What about the just inserted event..?
381 for( ; i != iend; i++ ) {
382
383 if( sleptTime >= i->remainingDelay)
384 i->remainingDelay = 0;
385 else
386 {
387 i->remainingDelay -= sleptTime;
388
389 // XXX Mario: Testcode, just to find a bug...
390 boost::posix_time::time_duration time_passed = sleepEnd - i->getScheduledTime();
391 logging_debug("SystemQueue(" << this << ") : Total: " <<
392 i->delayTime << ", remainingDelay: " << i->remainingDelay <<
393 ", time already passed: " << time_passed.total_milliseconds() );
394 }
395
396 } // for( ; i != iend; i++ )
397
398 // now we have to reorder the events
399 // in the queue with respect to their remaining delay
400 // the SystemQueue::operator< takes care of the
401 // ordering with respect to the remaining delay
402
403 std::sort( eventsQueue.begin(), eventsQueue.end() );
404
405}
406
407void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
408
409 // sleeps will be cancelled in the
410 // onItemInserted function when a new
411 // event arrives during sleeping
412
413 assert( !isSleeping );
414
415 // the given item is the one with the least
416 // amount of sleep time left. because all
417 // items are reordered in onItemInserted
418
419 if( event.remainingDelay > 0 ) {
420
421 const boost::system_time duration =
422 boost::get_system_time() +
423 boost::posix_time::milliseconds(event.remainingDelay);
424
425 logging_debug("SystemQueue(" << this << ") : Sleeping for: " << event.remainingDelay << " ms");
426
427 {
428 boost::unique_lock<boost::mutex> lock( sleepMutex );
429
430 sleepStart = boost::posix_time::microsec_clock::local_time();
431 isSleeping = true;
432
433 sleepCond.timed_wait( lock, duration );
434
435 isSleeping = false;
436 }
437
438 } // if( event.remainingDelay > 0 )
439
440 // if the sleep succeeded and was not
441 // interrupted by a new incoming item
442 // we can now deliver this event
443
444 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
445 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
446 uint32_t sleptTime = duration.total_milliseconds();
447
448 logging_debug("SystemQueue(" << this << ") : Slept for: " << sleptTime << " ms; until: " << sleepEnd);
449
450 // TODO MARIO: find the bug that loses events...
451 if (event.remainingDelay <= sleptTime)
452 {
453 logging_debug("SystemQueue(" << this << ") : Transferring scheduled event into the direct queue. Scheduled time: " << event.getScheduledTime() );
454 transferQueue->insert( event, 0 );
455 }
456 else
457 {
458 logging_warn("SystemQueue(" << this << ") : Scheduled event lost!! :-( (Sleep should have been " << event.remainingDelay - sleptTime << " ms longer..)");
459 logging_debug("SystemQueue(" << this << ") : Total delay: " << event.delayTime << "; remaining delay: " << event.remainingDelay);
460
461// throw std::logic_error("Scheduled event lost!! :-(");
462 }
463}
464
465#endif // #ifndef UNDERLAY_OMNET
466
467//***************************************************************
468
469}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.