1 | // [License] |
---|
2 | // The Ariba-Underlay Copyright |
---|
3 | // |
---|
4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH) |
---|
5 | // |
---|
6 | // Institute of Telematics |
---|
7 | // UniversitÀt Karlsruhe (TH) |
---|
8 | // Zirkel 2, 76128 Karlsruhe |
---|
9 | // Germany |
---|
10 | // |
---|
11 | // Redistribution and use in source and binary forms, with or without |
---|
12 | // modification, are permitted provided that the following conditions are |
---|
13 | // met: |
---|
14 | // |
---|
15 | // 1. Redistributions of source code must retain the above copyright |
---|
16 | // notice, this list of conditions and the following disclaimer. |
---|
17 | // 2. Redistributions in binary form must reproduce the above copyright |
---|
18 | // notice, this list of conditions and the following disclaimer in the |
---|
19 | // documentation and/or other materials provided with the distribution. |
---|
20 | // |
---|
21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND |
---|
22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
---|
23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
---|
24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR |
---|
25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
---|
26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
---|
27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
---|
28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
---|
29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
---|
30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
32 | // |
---|
33 | // The views and conclusions contained in the software and documentation |
---|
34 | // are those of the authors and should not be interpreted as representing |
---|
35 | // official policies, either expressed or implied, of the Institute of |
---|
36 | // Telematics. |
---|
37 | |
---|
38 | #include "SystemQueue.h" |
---|
39 | |
---|
40 | namespace ariba { |
---|
41 | namespace utility { |
---|
42 | |
---|
43 | use_logging_cpp(SystemQueue); |
---|
44 | |
---|
45 | SystemQueue::SystemQueue() |
---|
46 | #ifndef UNDERLAY_OMNET |
---|
47 | : delayScheduler( &directScheduler ), systemQueueRunning( false ) |
---|
48 | #endif |
---|
49 | { |
---|
50 | } |
---|
51 | |
---|
52 | SystemQueue::~SystemQueue() { |
---|
53 | } |
---|
54 | |
---|
55 | void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) { |
---|
56 | #ifndef UNDERLAY_OMNET |
---|
57 | if( delay == 0 ) directScheduler.insert( event, delay ); |
---|
58 | else delayScheduler.insert ( event, delay ); |
---|
59 | #else |
---|
60 | Enter_Method_Silent(); |
---|
61 | cMessage* msg = new cMessage(); |
---|
62 | msg->setContextPointer( new SystemEvent(event) ); |
---|
63 | |
---|
64 | if( delay == 0 ) |
---|
65 | cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg ); |
---|
66 | else |
---|
67 | cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg ); |
---|
68 | #endif |
---|
69 | } |
---|
70 | |
---|
71 | #ifdef UNDERLAY_OMNET |
---|
72 | void SystemQueue::handleMessage(cMessage* msg){ |
---|
73 | SystemEvent* event = (SystemEvent*)msg->contextPointer(); |
---|
74 | |
---|
75 | event->listener->handleSystemEvent( *event ); |
---|
76 | |
---|
77 | delete event; delete msg; |
---|
78 | } |
---|
79 | #endif |
---|
80 | |
---|
81 | void SystemQueue::run() { |
---|
82 | #ifndef UNDERLAY_OMNET |
---|
83 | systemQueueRunning = true; |
---|
84 | directScheduler.run(); |
---|
85 | delayScheduler.run(); |
---|
86 | #endif |
---|
87 | } |
---|
88 | |
---|
89 | void SystemQueue::cancel() { |
---|
90 | #ifndef UNDERLAY_OMNET |
---|
91 | systemQueueRunning = false; |
---|
92 | directScheduler.cancel(); |
---|
93 | delayScheduler.cancel(); |
---|
94 | #endif |
---|
95 | } |
---|
96 | |
---|
97 | void SystemQueue::dropAll( const SystemEventListener* mlistener){ |
---|
98 | #ifndef UNDERLAY_OMNET |
---|
99 | directScheduler.dropAll(mlistener); |
---|
100 | delayScheduler.dropAll(mlistener); |
---|
101 | #endif |
---|
102 | } |
---|
103 | |
---|
104 | bool SystemQueue::isEmpty() { |
---|
105 | #ifndef UNDERLAY_OMNET |
---|
106 | return directScheduler.isEmpty() || delayScheduler.isEmpty(); |
---|
107 | #else |
---|
108 | return false; |
---|
109 | #endif |
---|
110 | } |
---|
111 | |
---|
112 | bool SystemQueue::isRunning() { |
---|
113 | #ifndef UNDERLAY_OMNET |
---|
114 | return systemQueueRunning; |
---|
115 | #else |
---|
116 | return true; |
---|
117 | #endif |
---|
118 | } |
---|
119 | |
---|
120 | void SystemQueue::enterMethod(){ |
---|
121 | // TODO: omnet case and delay scheduler |
---|
122 | directScheduler.enter(); |
---|
123 | } |
---|
124 | |
---|
125 | void SystemQueue::leaveMethod(){ |
---|
126 | // TODO: omnet case and delay scheduler |
---|
127 | directScheduler.leave(); |
---|
128 | } |
---|
129 | |
---|
130 | //*************************************************************** |
---|
131 | #ifndef UNDERLAY_OMNET |
---|
132 | |
---|
133 | SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue) |
---|
134 | : transferQueue( _transferQueue ), running( false ) { |
---|
135 | } |
---|
136 | |
---|
137 | SystemQueue::QueueThread::~QueueThread(){ |
---|
138 | } |
---|
139 | |
---|
140 | void SystemQueue::QueueThread::run(){ |
---|
141 | running = true; |
---|
142 | |
---|
143 | queueThread = new boost::thread( |
---|
144 | boost::bind(&QueueThread::threadFunc, this) ); |
---|
145 | } |
---|
146 | |
---|
147 | void SystemQueue::QueueThread::cancel(){ |
---|
148 | |
---|
149 | logging_debug("cancelling system queue"); |
---|
150 | |
---|
151 | // cause the thread to exit |
---|
152 | { |
---|
153 | // get the lock, when we got the lock the |
---|
154 | // queue thread must be in itemsAvailable.wait() |
---|
155 | boost::mutex::scoped_lock lock(queueMutex); |
---|
156 | |
---|
157 | // set the running indicator and signal to run on |
---|
158 | // this will run the thread and quit it |
---|
159 | running = false; |
---|
160 | itemsAvailable.notify_all(); |
---|
161 | } |
---|
162 | |
---|
163 | // wait until the thread has exited |
---|
164 | logging_debug("joining system queue thread"); |
---|
165 | queueThread->join(); |
---|
166 | |
---|
167 | // delete pending events |
---|
168 | logging_debug("deleting pending system queue events"); |
---|
169 | while( eventsQueue.size() > 0 ){ |
---|
170 | eventsQueue.erase( eventsQueue.begin() ); |
---|
171 | } |
---|
172 | |
---|
173 | // delete the thread, so that a subsuquent run() can be called |
---|
174 | delete queueThread; |
---|
175 | queueThread = NULL; |
---|
176 | } |
---|
177 | |
---|
178 | bool SystemQueue::QueueThread::isEmpty(){ |
---|
179 | boost::mutex::scoped_lock lock( queueMutex ); |
---|
180 | return eventsQueue.empty(); |
---|
181 | } |
---|
182 | |
---|
183 | void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){ |
---|
184 | |
---|
185 | // if this is called from a module that is currently handling |
---|
186 | // a thread (called from SystemQueue::onNextQueueItem), the |
---|
187 | // thread is the same anyway and the mutex will be already |
---|
188 | // aquired, otherwise we aquire it now |
---|
189 | |
---|
190 | boost::mutex::scoped_lock lock( queueMutex ); |
---|
191 | |
---|
192 | eventsQueue.push_back( event ); |
---|
193 | eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time(); |
---|
194 | eventsQueue.back().delayTime = delay; |
---|
195 | eventsQueue.back().remainingDelay = delay; |
---|
196 | |
---|
197 | onItemInserted( event ); |
---|
198 | itemsAvailable.notify_all(); |
---|
199 | } |
---|
200 | |
---|
201 | void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) { |
---|
202 | boost::mutex::scoped_lock lock( queueMutex ); |
---|
203 | |
---|
204 | bool deleted; |
---|
205 | do{ |
---|
206 | deleted = false; |
---|
207 | EventQueue::iterator i = eventsQueue.begin(); |
---|
208 | EventQueue::iterator iend = eventsQueue.end(); |
---|
209 | |
---|
210 | for( ; i != iend; i++){ |
---|
211 | if((*i).getListener() == mlistener){ |
---|
212 | eventsQueue.erase(i); |
---|
213 | deleted = true; |
---|
214 | break; |
---|
215 | } |
---|
216 | } |
---|
217 | }while(deleted); |
---|
218 | } |
---|
219 | |
---|
220 | void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) { |
---|
221 | |
---|
222 | boost::mutex::scoped_lock lock( obj->queueMutex ); |
---|
223 | |
---|
224 | while( obj->running ) { |
---|
225 | |
---|
226 | // wait until an item is in the queue or we are notified |
---|
227 | // to quit the thread. in case the thread is about to |
---|
228 | // quit, the queueThreadRunning variable will indicate |
---|
229 | // this and cause the thread to exit |
---|
230 | |
---|
231 | while ( obj->running && obj->eventsQueue.empty() ){ |
---|
232 | |
---|
233 | // const boost::system_time duration = |
---|
234 | // boost::get_system_time() + |
---|
235 | // boost::posix_time::milliseconds(100); |
---|
236 | // obj->itemsAvailable.timed_wait( lock, duration ); |
---|
237 | |
---|
238 | obj->itemsAvailable.wait( lock ); |
---|
239 | } |
---|
240 | |
---|
241 | // |
---|
242 | // work all the items that are currently in the queue |
---|
243 | // |
---|
244 | |
---|
245 | while( obj->running && (!obj->eventsQueue.empty()) ) { |
---|
246 | |
---|
247 | // fetch the first item in the queue |
---|
248 | // and deliver it to the queue handler |
---|
249 | SystemEvent ev = obj->eventsQueue.front(); |
---|
250 | obj->eventsQueue.erase( obj->eventsQueue.begin() ); |
---|
251 | |
---|
252 | // call the queue and this will |
---|
253 | // call the actual event handler |
---|
254 | obj->queueMutex.unlock(); |
---|
255 | obj->onNextQueueItem( ev ); |
---|
256 | obj->queueMutex.lock(); |
---|
257 | |
---|
258 | } // !obj->eventsQueue.empty() ) |
---|
259 | } // while (obj->running) |
---|
260 | |
---|
261 | logging_debug("system queue exited"); |
---|
262 | } |
---|
263 | |
---|
264 | void SystemQueue::QueueThread::enter(){ |
---|
265 | queueMutex.lock(); |
---|
266 | } |
---|
267 | |
---|
268 | void SystemQueue::QueueThread::leave(){ |
---|
269 | queueMutex.unlock(); |
---|
270 | } |
---|
271 | |
---|
272 | |
---|
273 | //*************************************************************** |
---|
274 | |
---|
275 | SystemQueue::QueueThreadDirect::QueueThreadDirect(){ |
---|
276 | } |
---|
277 | |
---|
278 | SystemQueue::QueueThreadDirect::~QueueThreadDirect(){ |
---|
279 | } |
---|
280 | |
---|
281 | void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){ |
---|
282 | // do nothing here |
---|
283 | } |
---|
284 | |
---|
285 | void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){ |
---|
286 | // directly deliver the item to the |
---|
287 | event.getListener()->handleSystemEvent( event ); |
---|
288 | } |
---|
289 | |
---|
290 | //*************************************************************** |
---|
291 | |
---|
292 | SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue) |
---|
293 | : QueueThread( _transferQueue ), isSleeping( false ) { |
---|
294 | |
---|
295 | assert( _transferQueue != NULL ); |
---|
296 | } |
---|
297 | |
---|
298 | SystemQueue::QueueThreadDelay::~QueueThreadDelay(){ |
---|
299 | } |
---|
300 | |
---|
301 | void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){ |
---|
302 | |
---|
303 | if( !isSleeping) return; |
---|
304 | |
---|
305 | // break an existing sleep and |
---|
306 | // remember the time that was actually slept for |
---|
307 | // and change it for every event in the queue |
---|
308 | |
---|
309 | assert( !eventsQueue.empty()); |
---|
310 | sleepCond.notify_all(); |
---|
311 | |
---|
312 | ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); |
---|
313 | boost::posix_time::time_duration duration = sleepEnd - sleepStart; |
---|
314 | uint32_t sleptTime = duration.total_milliseconds(); |
---|
315 | |
---|
316 | EventQueue::iterator i = eventsQueue.begin(); |
---|
317 | EventQueue::iterator iend = eventsQueue.end(); |
---|
318 | |
---|
319 | for( ; i != iend; i++ ) { |
---|
320 | |
---|
321 | if( sleptTime >= i->remainingDelay) |
---|
322 | i->remainingDelay = 0; |
---|
323 | else |
---|
324 | i->remainingDelay -= sleptTime; |
---|
325 | |
---|
326 | } // for( ; i != iend; i++ ) |
---|
327 | |
---|
328 | // now we have to reorder the events |
---|
329 | // in the queue with respect to their remaining delay |
---|
330 | // the SystemQueue::operator< takes care of the |
---|
331 | // ordering with respect to the remaining delay |
---|
332 | |
---|
333 | std::sort( eventsQueue.begin(), eventsQueue.end() ); |
---|
334 | |
---|
335 | } |
---|
336 | |
---|
337 | void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){ |
---|
338 | |
---|
339 | // sleeps will be cancelled in the |
---|
340 | // onItemInserted function when a new |
---|
341 | // event arrives during sleeping |
---|
342 | |
---|
343 | assert( !isSleeping ); |
---|
344 | |
---|
345 | // the given item is the one with the least |
---|
346 | // amount of sleep time left. because all |
---|
347 | // items are reordered in onItemInserted |
---|
348 | |
---|
349 | if( event.remainingDelay > 0 ) { |
---|
350 | |
---|
351 | const boost::system_time duration = |
---|
352 | boost::get_system_time() + |
---|
353 | boost::posix_time::milliseconds(event.remainingDelay); |
---|
354 | |
---|
355 | { |
---|
356 | boost::unique_lock<boost::mutex> lock( sleepMutex ); |
---|
357 | |
---|
358 | sleepStart = boost::posix_time::microsec_clock::local_time(); |
---|
359 | isSleeping = true; |
---|
360 | |
---|
361 | sleepCond.timed_wait( lock, duration ); |
---|
362 | |
---|
363 | isSleeping = false; |
---|
364 | } |
---|
365 | |
---|
366 | } // if( event.remainingDelay > 0 ) |
---|
367 | |
---|
368 | // if the sleep succeeded and was not |
---|
369 | // interrupted by a new incoming item |
---|
370 | // we can now deliver this event |
---|
371 | |
---|
372 | ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); |
---|
373 | boost::posix_time::time_duration duration = sleepEnd - sleepStart; |
---|
374 | uint32_t sleptTime = duration.total_milliseconds(); |
---|
375 | |
---|
376 | if (event.remainingDelay <= sleptTime) |
---|
377 | transferQueue->insert( event, 0 ); |
---|
378 | } |
---|
379 | |
---|
380 | #endif // #ifndef UNDERLAY_OMNET |
---|
381 | |
---|
382 | //*************************************************************** |
---|
383 | |
---|
384 | }} // spovnet, common |
---|