source: source/ariba/utility/system/SystemQueue.cpp@ 4705

Last change on this file since 4705 was 4704, checked in by Christoph Mayer, 15 years ago

locking für enter method neu

File size: 9.1 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39
40namespace ariba {
41namespace utility {
42
43use_logging_cpp(SystemQueue);
44
45SystemQueue::SystemQueue()
46#ifndef UNDERLAY_OMNET
47 : delayScheduler( &directScheduler ), systemQueueRunning( false )
48#endif
49{
50}
51
52SystemQueue::~SystemQueue() {
53}
54
55void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) {
56#ifndef UNDERLAY_OMNET
57 if( delay == 0 ) directScheduler.insert( event, delay );
58 else delayScheduler.insert ( event, delay );
59#else
60 Enter_Method_Silent();
61 cMessage* msg = new cMessage();
62 msg->setContextPointer( new SystemEvent(event) );
63
64 if( delay == 0 )
65 cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg );
66 else
67 cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg );
68#endif
69}
70
71#ifdef UNDERLAY_OMNET
72void SystemQueue::handleMessage(cMessage* msg){
73 SystemEvent* event = (SystemEvent*)msg->contextPointer();
74
75 event->listener->handleSystemEvent( *event );
76
77 delete event; delete msg;
78}
79#endif
80
81void SystemQueue::run() {
82#ifndef UNDERLAY_OMNET
83 systemQueueRunning = true;
84 directScheduler.run();
85 delayScheduler.run();
86#endif
87}
88
89void SystemQueue::cancel() {
90#ifndef UNDERLAY_OMNET
91 systemQueueRunning = false;
92 directScheduler.cancel();
93 delayScheduler.cancel();
94#endif
95}
96
97bool SystemQueue::isEmpty() {
98#ifndef UNDERLAY_OMNET
99 return directScheduler.isEmpty() || delayScheduler.isEmpty();
100#else
101 return false;
102#endif
103}
104
105bool SystemQueue::isRunning() {
106#ifndef UNDERLAY_OMNET
107 return systemQueueRunning;
108#else
109 return true;
110#endif
111}
112
113void SystemQueue::enterMethod(){
114 // TODO: omnet case and delay scheduler
115 directScheduler.enter();
116}
117
118void SystemQueue::leaveMethod(){
119 // TODO: omnet case and delay scheduler
120 directScheduler.leave();
121}
122
123//***************************************************************
124#ifndef UNDERLAY_OMNET
125
126SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue)
127 : running( false ), transferQueue( _transferQueue ) {
128}
129
130SystemQueue::QueueThread::~QueueThread(){
131}
132
133void SystemQueue::QueueThread::run(){
134 running = true;
135 queueThread = new boost::thread(
136 boost::bind(&QueueThread::threadFunc, this) );
137}
138
139void SystemQueue::QueueThread::cancel(){
140
141 // cause the thread to exit
142 {
143 boost::mutex::scoped_lock lock(queueMutex);
144 running = false;
145
146 while( eventsQueue.size() > 0 ){
147 eventsQueue.erase( eventsQueue.begin() );
148 }
149
150 itemsAvailable.notify_all();
151 }
152
153 // wait that the thread has exited
154 // and delete it, so that a subsuquent run() can be called
155
156 queueThread->join();
157 delete queueThread;
158 queueThread = NULL;
159}
160
161bool SystemQueue::QueueThread::isEmpty(){
162 boost::mutex::scoped_lock lock( queueMutex );
163 return eventsQueue.empty();
164}
165
166void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
167
168 // if this is called from a module that is currently handling
169 // a thread (called from SystemQueue::onNextQueueItem), the
170 // thread is the same anyway and the mutex will be already
171 // aquired, otherwise we aquire it now
172
173 boost::mutex::scoped_lock lock( queueMutex );
174
175 eventsQueue.push_back( event );
176 eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
177 eventsQueue.back().delayTime = delay;
178 eventsQueue.back().remainingDelay = delay;
179
180 onItemInserted( event );
181 itemsAvailable.notify_all();
182}
183
184void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
185
186 boost::mutex::scoped_lock lock( obj->queueMutex );
187
188 while( obj->running ) {
189
190 // wait until an item is in the queue or we are notified
191 // to quit the thread. in case the thread is about to
192 // quit, the queueThreadRunning variable will indicate
193 // this and cause the thread to exit
194
195 while ( obj->eventsQueue.empty() && obj->running ){
196
197// const boost::system_time duration =
198// boost::get_system_time() +
199// boost::posix_time::milliseconds(100);
200
201 obj->itemsAvailable.wait( lock );
202// obj->itemsAvailable.timed_wait( lock, duration );
203 }
204
205 //
206 // work all the items that are currently in the queue
207 //
208
209 while( !obj->eventsQueue.empty() && obj->running ) {
210
211 // fetch the first item in the queue
212 // and deliver it to the queue handler
213 SystemEvent ev = obj->eventsQueue.front();
214 obj->eventsQueue.erase( obj->eventsQueue.begin() );
215
216 // call the queue and this will
217 // call the actual event handler
218 obj->onNextQueueItem( ev );
219
220 } // !obj->eventsQueue.empty() )
221 } // while (obj->running)
222
223 logging_debug("system queue exited");
224}
225
226void SystemQueue::QueueThread::enter(){
227 queueMutex.lock();
228}
229
230void SystemQueue::QueueThread::leave(){
231 queueMutex.unlock();
232}
233
234
235//***************************************************************
236
237SystemQueue::QueueThreadDirect::QueueThreadDirect(){
238}
239
240SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
241}
242
243void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
244 // do nothing here
245}
246
247void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
248 // directly deliver the item to the
249 event.getListener()->handleSystemEvent( event );
250}
251
252//***************************************************************
253
254SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
255 : QueueThread( _transferQueue ), isSleeping( false ) {
256
257 assert( _transferQueue != NULL );
258}
259
260SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
261}
262
263void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
264
265 if( !isSleeping) return;
266
267 // break an existing sleep and
268 // remember the time that was actually slept for
269 // and change it for every event in the queue
270
271 assert( !eventsQueue.empty());
272 sleepCond.notify_all();
273
274 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
275 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
276 uint32_t sleptTime = duration.total_milliseconds();
277
278 EventQueue::iterator i = eventsQueue.begin();
279 EventQueue::iterator iend = eventsQueue.end();
280
281 for( ; i != iend; i++ ) {
282
283 if( sleptTime >= i->remainingDelay)
284 i->remainingDelay = 0;
285 else
286 i->remainingDelay -= sleptTime;
287
288 } // for( ; i != iend; i++ )
289
290 // now we have to reorder the events
291 // in the queue with respect to their remaining delay
292 // the SystemQueue::operator< takes care of the
293 // ordering with respect to the remaining delay
294
295 std::sort( eventsQueue.begin(), eventsQueue.end() );
296
297}
298
299void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
300
301 // sleeps will be cancelled in the
302 // onItemInserted function when a new
303 // event arrives during sleeping
304
305 assert( !isSleeping );
306
307 // the given item is the one with the least
308 // amount of sleep time left. because all
309 // items are reordered in onItemInserted
310
311 if( event.remainingDelay > 0 ) {
312
313 const boost::system_time duration =
314 boost::get_system_time() +
315 boost::posix_time::milliseconds(event.remainingDelay);
316
317 {
318 boost::unique_lock<boost::mutex> lock( sleepMutex );
319
320 sleepStart = boost::posix_time::microsec_clock::local_time();
321 isSleeping = true;
322
323 sleepCond.timed_wait( lock, duration );
324
325 isSleeping = false;
326 }
327
328 } // if( event.remainingDelay > 0 )
329
330 // if the sleep succeeded and was not
331 // interrupted by a new incoming item
332 // we can now deliver this event
333
334 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
335 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
336 uint32_t sleptTime = duration.total_milliseconds();
337
338 if (event.remainingDelay <= sleptTime)
339 transferQueue->insert( event, 0 );
340}
341
342#endif // #ifndef UNDERLAY_OMNET
343
344//***************************************************************
345
346}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.