source: source/ariba/utility/system/SystemQueue.cpp@ 12524

Last change on this file since 12524 was 12524, checked in by hock@…, 11 years ago

moved SystemQueue::instance() into cpp:
fixes some weird issues in some corner cases.
(e.g. strange compiler settings in application which links to libariba..)

File size: 10.6 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39
40namespace ariba {
41namespace utility {
42
43use_logging_cpp(SystemQueue);
44
45SystemQueue::SystemQueue()
46#ifndef UNDERLAY_OMNET
47 : delayScheduler( &directScheduler ), systemQueueRunning( false )
48#endif
49{
50}
51
52SystemQueue::~SystemQueue() {
53}
54
55
56SystemQueue& SystemQueue::instance()
57{
58 static SystemQueue _inst;
59// std::cout << "SystemQueue::instance: " << &_inst << std::endl;
60 return _inst;
61}
62
63
64void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) {
65#ifndef UNDERLAY_OMNET
66 if( delay == 0 ) directScheduler.insert( event, delay );
67 else delayScheduler.insert ( event, delay );
68#else
69 Enter_Method_Silent();
70 cMessage* msg = new cMessage();
71 msg->setContextPointer( new SystemEvent(event) );
72
73 if( delay == 0 )
74 cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg );
75 else
76 cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg );
77#endif
78}
79
80// maps to function call internally to the Event-system
81void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
82{
83 // copy function object
84 boost::function0<void>* function_ptr = new boost::function0<void>();
85 (*function_ptr) = function;
86
87 // schedule special call-event
88 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
89
90}
91
92#ifdef UNDERLAY_OMNET
93void SystemQueue::handleMessage(cMessage* msg){
94 SystemEvent* event = (SystemEvent*)msg->contextPointer();
95
96 event->listener->handleSystemEvent( *event );
97
98 delete event; delete msg;
99}
100#endif
101
102void SystemQueue::run() {
103 systemQueueRunning = true;
104 directScheduler.run();
105 delayScheduler.run();
106}
107
108void SystemQueue::cancel() {
109#ifndef UNDERLAY_OMNET
110 systemQueueRunning = false;
111 directScheduler.cancel();
112 delayScheduler.cancel();
113#endif
114}
115
116void SystemQueue::dropAll( const SystemEventListener* mlistener){
117#ifndef UNDERLAY_OMNET
118 directScheduler.dropAll(mlistener);
119 delayScheduler.dropAll(mlistener);
120#endif
121}
122
123bool SystemQueue::isEmpty() {
124#ifndef UNDERLAY_OMNET
125 return directScheduler.isEmpty() || delayScheduler.isEmpty();
126#else
127 return false;
128#endif
129}
130
131bool SystemQueue::isRunning() {
132#ifndef UNDERLAY_OMNET
133 return systemQueueRunning;
134#else
135 return true;
136#endif
137}
138
139void SystemQueue::enterMethod(){
140 // TODO: omnet case and delay scheduler
141 directScheduler.enter();
142}
143
144void SystemQueue::leaveMethod(){
145 // TODO: omnet case and delay scheduler
146 directScheduler.leave();
147}
148
149//***************************************************************
150#ifndef UNDERLAY_OMNET
151
152SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue)
153 : transferQueue( _transferQueue ), running( false ) {
154}
155
156SystemQueue::QueueThread::~QueueThread(){
157}
158
159void SystemQueue::QueueThread::run(){
160 running = true;
161
162 queueThread = new boost::thread(
163 boost::bind(&QueueThread::threadFunc, this) );
164}
165
166void SystemQueue::QueueThread::cancel(){
167
168 logging_debug("cancelling system queue");
169
170 // cause the thread to exit
171 {
172 // get the lock, when we got the lock the
173 // queue thread must be in itemsAvailable.wait()
174 boost::mutex::scoped_lock lock(queueMutex);
175
176 // set the running indicator and signal to run on
177 // this will run the thread and quit it
178 running = false;
179 itemsAvailable.notify_all();
180 }
181
182 // wait until the thread has exited
183 logging_debug("joining system queue thread");
184 queueThread->join();
185
186 // delete pending events
187 logging_debug("deleting pending system queue events");
188 while( eventsQueue.size() > 0 ){
189 eventsQueue.erase( eventsQueue.begin() );
190 }
191
192 // delete the thread, so that a subsuquent run() can be called
193 delete queueThread;
194 queueThread = NULL;
195}
196
197bool SystemQueue::QueueThread::isEmpty(){
198 boost::mutex::scoped_lock lock( queueMutex );
199 return eventsQueue.empty();
200}
201
202void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
203
204 // if this is called from a module that is currently handling
205 // a thread (called from SystemQueue::onNextQueueItem), the
206 // thread is the same anyway and the mutex will be already
207 // aquired, otherwise we aquire it now
208
209 boost::mutex::scoped_lock lock( queueMutex );
210
211 eventsQueue.push_back( event );
212 eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
213 eventsQueue.back().delayTime = delay;
214 eventsQueue.back().remainingDelay = delay;
215
216 onItemInserted( event );
217 itemsAvailable.notify_all();
218}
219
220void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) {
221 boost::mutex::scoped_lock lock( queueMutex );
222
223 bool deleted;
224 do{
225 deleted = false;
226 EventQueue::iterator i = eventsQueue.begin();
227 EventQueue::iterator iend = eventsQueue.end();
228
229 for( ; i != iend; i++){
230 if((*i).getListener() == mlistener){
231 eventsQueue.erase(i);
232 deleted = true;
233 break;
234 }
235 }
236 }while(deleted);
237}
238
239void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
240
241 boost::mutex::scoped_lock lock( obj->queueMutex );
242
243 while( obj->running ) {
244
245 // wait until an item is in the queue or we are notified
246 // to quit the thread. in case the thread is about to
247 // quit, the queueThreadRunning variable will indicate
248 // this and cause the thread to exit
249
250 while ( obj->running && obj->eventsQueue.empty() ){
251
252// const boost::system_time duration =
253// boost::get_system_time() +
254// boost::posix_time::milliseconds(100);
255// obj->itemsAvailable.timed_wait( lock, duration );
256
257 obj->itemsAvailable.wait( lock );
258 }
259
260 //
261 // work all the items that are currently in the queue
262 //
263
264 while( obj->running && (!obj->eventsQueue.empty()) ) {
265
266 // fetch the first item in the queue
267 // and deliver it to the queue handler
268 SystemEvent ev = obj->eventsQueue.front();
269 obj->eventsQueue.erase( obj->eventsQueue.begin() );
270
271 // call the queue and this will
272 // call the actual event handler
273 obj->queueMutex.unlock();
274 obj->onNextQueueItem( ev );
275 obj->queueMutex.lock();
276
277 } // !obj->eventsQueue.empty() )
278 } // while (obj->running)
279
280 logging_debug("system queue exited");
281}
282
283void SystemQueue::QueueThread::enter(){
284 queueMutex.lock();
285}
286
287void SystemQueue::QueueThread::leave(){
288 queueMutex.unlock();
289}
290
291
292//***************************************************************
293
294SystemQueue::QueueThreadDirect::QueueThreadDirect(){
295}
296
297SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
298}
299
300void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
301 // do nothing here
302}
303
304void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
305 // directly deliver the item to the
306 event.getListener()->handleSystemEvent( event );
307}
308
309//***************************************************************
310
311SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
312 : QueueThread( _transferQueue ), isSleeping( false ) {
313
314 assert( _transferQueue != NULL );
315}
316
317SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
318}
319
320void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
321
322 if( !isSleeping) return;
323
324 // break an existing sleep and
325 // remember the time that was actually slept for
326 // and change it for every event in the queue
327
328 assert( !eventsQueue.empty());
329 sleepCond.notify_all();
330
331 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
332 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
333 uint32_t sleptTime = duration.total_milliseconds();
334
335 EventQueue::iterator i = eventsQueue.begin();
336 EventQueue::iterator iend = eventsQueue.end();
337
338 for( ; i != iend; i++ ) {
339
340 if( sleptTime >= i->remainingDelay)
341 i->remainingDelay = 0;
342 else
343 i->remainingDelay -= sleptTime;
344
345 } // for( ; i != iend; i++ )
346
347 // now we have to reorder the events
348 // in the queue with respect to their remaining delay
349 // the SystemQueue::operator< takes care of the
350 // ordering with respect to the remaining delay
351
352 std::sort( eventsQueue.begin(), eventsQueue.end() );
353
354}
355
356void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
357
358 // sleeps will be cancelled in the
359 // onItemInserted function when a new
360 // event arrives during sleeping
361
362 assert( !isSleeping );
363
364 // the given item is the one with the least
365 // amount of sleep time left. because all
366 // items are reordered in onItemInserted
367
368 if( event.remainingDelay > 0 ) {
369
370 const boost::system_time duration =
371 boost::get_system_time() +
372 boost::posix_time::milliseconds(event.remainingDelay);
373
374 {
375 boost::unique_lock<boost::mutex> lock( sleepMutex );
376
377 sleepStart = boost::posix_time::microsec_clock::local_time();
378 isSleeping = true;
379
380 sleepCond.timed_wait( lock, duration );
381
382 isSleeping = false;
383 }
384
385 } // if( event.remainingDelay > 0 )
386
387 // if the sleep succeeded and was not
388 // interrupted by a new incoming item
389 // we can now deliver this event
390
391 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
392 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
393 uint32_t sleptTime = duration.total_milliseconds();
394
395 if (event.remainingDelay <= sleptTime)
396 transferQueue->insert( event, 0 );
397}
398
399#endif // #ifndef UNDERLAY_OMNET
400
401//***************************************************************
402
403}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.