An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/system/SystemQueue.cpp @ 8606

Last change on this file since 8606 was 7533, checked in by Christoph Mayer, 14 years ago

-fix für löschen von timer objekten welche noch aktive messages in der queue haben

File size: 10.1 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39
40namespace ariba {
41namespace utility {
42
43use_logging_cpp(SystemQueue);
44
45SystemQueue::SystemQueue()
46#ifndef UNDERLAY_OMNET
47 : delayScheduler( &directScheduler ), systemQueueRunning( false )
48#endif
49{
50}
51
52SystemQueue::~SystemQueue() {
53}
54
55void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) {
56#ifndef UNDERLAY_OMNET
57        if( delay == 0 ) directScheduler.insert( event, delay );
58        else delayScheduler.insert ( event, delay );
59#else
60        Enter_Method_Silent();
61        cMessage* msg = new cMessage();
62        msg->setContextPointer( new SystemEvent(event) );
63
64        if( delay == 0 )
65                cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg );
66        else
67                cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg );
68#endif
69}
70
71#ifdef UNDERLAY_OMNET
72void SystemQueue::handleMessage(cMessage* msg){
73        SystemEvent* event = (SystemEvent*)msg->contextPointer();
74
75        event->listener->handleSystemEvent( *event );
76
77        delete event; delete msg;
78}
79#endif
80
81void SystemQueue::run() {
82#ifndef UNDERLAY_OMNET
83        systemQueueRunning = true;
84        directScheduler.run();
85        delayScheduler.run();
86#endif
87}
88
89void SystemQueue::cancel() {
90#ifndef UNDERLAY_OMNET
91        systemQueueRunning = false;
92        directScheduler.cancel();
93        delayScheduler.cancel();
94#endif
95}
96
97void SystemQueue::dropAll( const SystemEventListener* mlistener){
98#ifndef UNDERLAY_OMNET
99        directScheduler.dropAll(mlistener);
100        delayScheduler.dropAll(mlistener);
101#endif
102}
103
104bool SystemQueue::isEmpty() {
105#ifndef UNDERLAY_OMNET
106        return directScheduler.isEmpty() || delayScheduler.isEmpty();
107#else
108        return false;
109#endif
110}
111
112bool SystemQueue::isRunning() {
113#ifndef UNDERLAY_OMNET
114        return systemQueueRunning;
115#else
116        return true;
117#endif
118}
119
120void SystemQueue::enterMethod(){
121        // TODO: omnet case and delay scheduler
122        directScheduler.enter();
123}
124
125void SystemQueue::leaveMethod(){
126        // TODO: omnet case and delay scheduler
127        directScheduler.leave();
128}
129
130//***************************************************************
131#ifndef UNDERLAY_OMNET
132
133SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue)
134        : transferQueue( _transferQueue ), running( false ) {
135}
136
137SystemQueue::QueueThread::~QueueThread(){
138}
139
140void SystemQueue::QueueThread::run(){
141        running = true;
142
143        queueThread = new boost::thread(
144                boost::bind(&QueueThread::threadFunc, this) );
145}
146
147void SystemQueue::QueueThread::cancel(){
148
149        logging_debug("cancelling system queue");
150
151        // cause the thread to exit
152        {
153                // get the lock, when we got the lock the
154                // queue thread must be in itemsAvailable.wait()
155                boost::mutex::scoped_lock lock(queueMutex);
156
157                // set the running indicator and signal to run on
158                // this will run the thread and quit it
159                running = false;
160                itemsAvailable.notify_all();
161        }
162
163        // wait until the thread has exited
164        logging_debug("joining system queue thread");
165        queueThread->join();
166
167        // delete pending events
168        logging_debug("deleting pending system queue events");
169        while( eventsQueue.size() > 0 ){
170                eventsQueue.erase( eventsQueue.begin() );
171        }
172
173        // delete the thread, so that a subsuquent run() can be called
174        delete queueThread;
175        queueThread = NULL;
176}
177
178bool SystemQueue::QueueThread::isEmpty(){
179        boost::mutex::scoped_lock lock( queueMutex );
180        return eventsQueue.empty();
181}
182
183void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
184
185        // if this is called from a module that is currently handling
186        // a thread (called from SystemQueue::onNextQueueItem), the
187        // thread is the same anyway and the mutex will be already
188        // aquired, otherwise we aquire it now
189
190        boost::mutex::scoped_lock lock( queueMutex );
191
192        eventsQueue.push_back( event );
193        eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
194        eventsQueue.back().delayTime = delay;
195        eventsQueue.back().remainingDelay = delay;
196
197        onItemInserted( event );
198        itemsAvailable.notify_all();
199}
200
201void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) {
202        boost::mutex::scoped_lock lock( queueMutex );
203
204        bool deleted;
205        do{
206                deleted = false;
207                EventQueue::iterator i = eventsQueue.begin();
208                EventQueue::iterator iend = eventsQueue.end();
209
210                for( ; i != iend; i++){
211                        if((*i).getListener() == mlistener){
212                                eventsQueue.erase(i);
213                                deleted = true;
214                                break;
215                        }
216                }
217        }while(deleted);
218}
219
220void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
221
222        boost::mutex::scoped_lock lock( obj->queueMutex );
223
224        while( obj->running ) {
225
226                // wait until an item is in the queue or we are notified
227                // to quit the thread. in case the thread is about to
228                // quit, the queueThreadRunning variable will indicate
229                // this and cause the thread to exit
230
231                while ( obj->running && obj->eventsQueue.empty() ){
232
233//                      const boost::system_time duration =
234//                                      boost::get_system_time() +
235//                                      boost::posix_time::milliseconds(100);
236//                      obj->itemsAvailable.timed_wait( lock, duration );
237
238                        obj->itemsAvailable.wait( lock );
239                }
240
241                //
242                // work all the items that are currently in the queue
243                //
244
245                while( obj->running && (!obj->eventsQueue.empty()) ) {
246
247                        // fetch the first item in the queue
248                        // and deliver it to the queue handler
249                        SystemEvent ev = obj->eventsQueue.front();
250                        obj->eventsQueue.erase( obj->eventsQueue.begin() );
251
252                        // call the queue and this will
253                        // call the actual event handler
254                        obj->queueMutex.unlock();
255                        obj->onNextQueueItem( ev );
256                        obj->queueMutex.lock();
257
258                } // !obj->eventsQueue.empty() )
259        } // while (obj->running)
260
261        logging_debug("system queue exited");
262}
263
264void SystemQueue::QueueThread::enter(){
265        queueMutex.lock();
266}
267
268void SystemQueue::QueueThread::leave(){
269        queueMutex.unlock();
270}
271
272
273//***************************************************************
274
275SystemQueue::QueueThreadDirect::QueueThreadDirect(){
276}
277
278SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
279}
280
281void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
282        // do nothing here
283}
284
285void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
286        // directly deliver the item to the
287        event.getListener()->handleSystemEvent( event );
288}
289
290//***************************************************************
291
292SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
293        : QueueThread( _transferQueue ), isSleeping( false ) {
294
295        assert( _transferQueue != NULL );
296}
297
298SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
299}
300
301void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
302
303        if( !isSleeping) return;
304
305        // break an existing sleep and
306        // remember the time that was actually slept for
307        // and change it for every event in the queue
308
309        assert( !eventsQueue.empty());
310        sleepCond.notify_all();
311
312        ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
313        boost::posix_time::time_duration duration = sleepEnd - sleepStart;
314        uint32_t sleptTime = duration.total_milliseconds();
315
316        EventQueue::iterator i = eventsQueue.begin();
317        EventQueue::iterator iend = eventsQueue.end();
318
319        for( ; i != iend; i++ ) {
320
321                if( sleptTime >= i->remainingDelay)
322                        i->remainingDelay = 0;
323                else
324                        i->remainingDelay -= sleptTime;
325
326        } // for( ; i != iend; i++ )
327
328        // now we have to reorder the events
329        // in the queue with respect to their remaining delay
330        // the SystemQueue::operator< takes care of the
331        // ordering with respect to the remaining delay
332
333        std::sort( eventsQueue.begin(), eventsQueue.end() );
334
335}
336
337void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
338
339        // sleeps will be cancelled in the
340        // onItemInserted function when a new
341        // event arrives during sleeping
342
343        assert( !isSleeping );
344
345        // the given item is the one with the least
346        // amount of sleep time left. because all
347        // items are reordered in onItemInserted
348
349        if( event.remainingDelay > 0 ) {
350
351                const boost::system_time duration =
352                        boost::get_system_time() +
353                        boost::posix_time::milliseconds(event.remainingDelay);
354
355                {
356                        boost::unique_lock<boost::mutex> lock( sleepMutex );
357
358                        sleepStart = boost::posix_time::microsec_clock::local_time();
359                        isSleeping = true;
360
361                        sleepCond.timed_wait( lock, duration );
362
363                        isSleeping = false;
364                }
365
366        } // if( event.remainingDelay > 0 )
367
368        // if the sleep succeeded and was not
369        // interrupted by a new incoming item
370        // we can now deliver this event
371
372        ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
373        boost::posix_time::time_duration duration = sleepEnd - sleepStart;
374        uint32_t sleptTime = duration.total_milliseconds();
375
376        if (event.remainingDelay <= sleptTime)
377                transferQueue->insert( event, 0 );
378}
379
380#endif // #ifndef UNDERLAY_OMNET
381
382//***************************************************************
383
384}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.