An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/system/SystemQueue.cpp @ 3037

Last change on this file since 3037 was 2850, checked in by Christoph Mayer, 15 years ago

-friend def für arm gefixt

File size: 8.5 KB
Line 
1// [Licence]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39
40namespace ariba {
41namespace utility {
42
43use_logging_cpp(SystemQueue);
44
45SystemQueue::SystemQueue()
46#ifndef UNDERLAY_OMNET
47 : delayScheduler( &directScheduler ), systemQueueRunning( false )
48#endif
49{
50}
51
52SystemQueue::~SystemQueue() {
53}
54
55void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) {
56#ifndef UNDERLAY_OMNET
57        if( delay == 0 ) directScheduler.insert( event, delay );
58        else delayScheduler.insert ( event, delay );
59#else
60        Enter_Method_Silent();
61        cMessage* msg = new cMessage();
62        msg->setContextPointer( new SystemEvent(event) );
63
64        if( delay == 0 )
65                cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg );
66        else
67                cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg );
68#endif
69}
70
71#ifdef UNDERLAY_OMNET
72void SystemQueue::handleMessage(cMessage* msg){
73        SystemEvent* event = (SystemEvent*)msg->contextPointer();
74
75        event->listener->handleSystemEvent( *event );
76
77        delete event; delete msg;
78}
79#endif
80
81void SystemQueue::run() {
82#ifndef UNDERLAY_OMNET
83        systemQueueRunning = true;
84        directScheduler.run();
85        delayScheduler.run();
86#endif
87}
88
89void SystemQueue::cancel() {
90#ifndef UNDERLAY_OMNET
91        systemQueueRunning = false;
92        directScheduler.cancel();
93        delayScheduler.cancel();
94#endif
95}
96
97bool SystemQueue::isEmpty() {
98#ifndef UNDERLAY_OMNET
99        return directScheduler.isEmpty() || delayScheduler.isEmpty();
100#else
101        return false;
102#endif
103}
104
105bool SystemQueue::isRunning() {
106#ifndef UNDERLAY_OMNET
107        return systemQueueRunning;
108#else
109        return true;
110#endif
111}
112
113//***************************************************************
114#ifndef UNDERLAY_OMNET
115
116SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue)
117        : running( false ), transferQueue( _transferQueue ) {
118}
119
120SystemQueue::QueueThread::~QueueThread(){
121}
122
123void SystemQueue::QueueThread::run(){
124        running = true;
125        queueThread = new boost::thread(
126                boost::bind(&QueueThread::threadFunc, this) );
127}
128
129void SystemQueue::QueueThread::cancel(){
130
131        // cause the thread to exit
132        running = false;
133        itemsAvailable.notify_all();
134
135        // wait that the thread has exited
136        // and delete it, so that a subsuquent run() can be called
137
138        queueThread->join();
139        delete queueThread;
140        queueThread = NULL;
141
142        while( eventsQueue.size() > 0 ){
143                eventsQueue.erase( eventsQueue.begin() );
144        }
145}
146
147bool SystemQueue::QueueThread::isEmpty(){
148        boost::mutex::scoped_lock lock( queueMutex );
149        return eventsQueue.empty();
150}
151
152void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
153        {
154                boost::mutex::scoped_lock lock( queueMutex );
155
156                eventsQueue.push_back( event );
157                eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
158                eventsQueue.back().delayTime = delay;
159                eventsQueue.back().remainingDelay = delay;
160
161                onItemInserted( event );
162                itemsAvailable.notify_all();
163        }
164}
165
166void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
167
168        boost::mutex::scoped_lock lock( obj->queueMutex );
169
170        while( obj->running ) {
171
172                // wait until an item is in the queue or we are notified
173                // to quit the thread. in case the thread is about to
174                // quit, the queueThreadRunning variable will indicate
175                // this and cause the thread to exit
176
177                while ( obj->eventsQueue.empty() && obj->running ){
178
179                        const boost::system_time duration =
180                                        boost::get_system_time() +
181                                        boost::posix_time::milliseconds(40);
182
183                        obj->itemsAvailable.wait( lock );
184//                      obj->itemsAvailable.timed_wait( lock, duration );
185                }
186
187                // work all the items that are currently in the queue
188
189                while( !obj->eventsQueue.empty() ) {
190
191                        // fetch the first item in the queue and deliver it to the
192                        // queue handler
193                        SystemEvent ev = obj->eventsQueue.front();
194                        obj->eventsQueue.erase( obj->eventsQueue.begin() );
195
196                        {
197                                // unlock the queue to that an event handler
198                                // can insert new items into the queue
199//                              obj->queueMutex.unlock();
200                                obj->onNextQueueItem( ev );
201//                              obj->queueMutex.lock();
202                        }
203
204                } // !obj->eventsQueue.empty() )
205        } // while (obj->running)
206}
207
208//***************************************************************
209
210SystemQueue::QueueThreadDirect::QueueThreadDirect(){
211}
212
213SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
214}
215
216void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
217        // do nothing here
218}
219
220void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
221        // directly deliver the item to the
222        event.getListener()->handleSystemEvent( event );
223}
224
225//***************************************************************
226
227SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
228        : QueueThread( _transferQueue ), isSleeping( false ) {
229
230        assert( _transferQueue != NULL );
231}
232
233SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
234}
235
236void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
237
238        if( !isSleeping) return;
239
240        // break an existing sleep and
241        // remember the time that was actually slept for
242        // and change it for every event in the queue
243
244        assert( !eventsQueue.empty());
245        sleepCond.notify_all();
246
247        ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
248        boost::posix_time::time_duration duration = sleepEnd - sleepStart;
249        uint32_t sleptTime = duration.total_milliseconds();
250
251        EventQueue::iterator i = eventsQueue.begin();
252        EventQueue::iterator iend = eventsQueue.end();
253
254        for( ; i != iend; i++ ) {
255
256                if( sleptTime >= i->remainingDelay)
257                        i->remainingDelay = 0;
258                else
259                        i->remainingDelay -= sleptTime;
260
261        } // for( ; i != iend; i++ )
262
263        // now we have to reorder the events
264        // in the queue with respect to their remaining delay
265        // the SystemQueue::operator< takes care of the
266        // ordering with respect to the remaining delay
267
268        std::sort( eventsQueue.begin(), eventsQueue.end() );
269
270}
271
272void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
273
274        // sleeps will be cancelled in the
275        // onItemInserted function when a new
276        // event arrives during sleeping
277
278        assert( !isSleeping );
279
280        // the given item is the one with the least
281        // amount of sleep time left. because all
282        // items are reordered in onItemInserted
283
284        if( event.remainingDelay > 0 ) {
285
286                const boost::system_time duration =
287                        boost::get_system_time() +
288                        boost::posix_time::milliseconds(event.remainingDelay);
289
290                {
291                        boost::unique_lock<boost::mutex> lock( sleepMutex );
292
293                        sleepStart = boost::posix_time::microsec_clock::local_time();
294                        isSleeping = true;
295
296                        sleepCond.timed_wait( lock, duration );
297
298                        isSleeping = false;
299                }
300
301        } // if( event.remainingDelay > 0 )
302
303        // if the sleep succeeded and was not
304        // interrupted by a new incoming item
305        // we can now deliver this event
306
307        ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
308        boost::posix_time::time_duration duration = sleepEnd - sleepStart;
309        uint32_t sleptTime = duration.total_milliseconds();
310
311        if (event.remainingDelay <= sleptTime)
312                transferQueue->insert( event, 0 );
313}
314
315#endif // #ifndef UNDERLAY_OMNET
316
317//***************************************************************
318
319}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.