1 | // [License] |
---|
2 | // The Ariba-Underlay Copyright |
---|
3 | // |
---|
4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH) |
---|
5 | // |
---|
6 | // Institute of Telematics |
---|
7 | // UniversitÀt Karlsruhe (TH) |
---|
8 | // Zirkel 2, 76128 Karlsruhe |
---|
9 | // Germany |
---|
10 | // |
---|
11 | // Redistribution and use in source and binary forms, with or without |
---|
12 | // modification, are permitted provided that the following conditions are |
---|
13 | // met: |
---|
14 | // |
---|
15 | // 1. Redistributions of source code must retain the above copyright |
---|
16 | // notice, this list of conditions and the following disclaimer. |
---|
17 | // 2. Redistributions in binary form must reproduce the above copyright |
---|
18 | // notice, this list of conditions and the following disclaimer in the |
---|
19 | // documentation and/or other materials provided with the distribution. |
---|
20 | // |
---|
21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND |
---|
22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
---|
23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
---|
24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR |
---|
25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
---|
26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
---|
27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
---|
28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
---|
29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
---|
30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
32 | // |
---|
33 | // The views and conclusions contained in the software and documentation |
---|
34 | // are those of the authors and should not be interpreted as representing |
---|
35 | // official policies, either expressed or implied, of the Institute of |
---|
36 | // Telematics. |
---|
37 | |
---|
38 | #include "SystemQueue.h" |
---|
39 | |
---|
40 | namespace ariba { |
---|
41 | namespace utility { |
---|
42 | |
---|
43 | use_logging_cpp(SystemQueue); |
---|
44 | |
---|
45 | SystemQueue::SystemQueue() |
---|
46 | #ifndef UNDERLAY_OMNET |
---|
47 | : delayScheduler( &directScheduler ), systemQueueRunning( false ) |
---|
48 | #endif |
---|
49 | { |
---|
50 | } |
---|
51 | |
---|
52 | SystemQueue::~SystemQueue() { |
---|
53 | } |
---|
54 | |
---|
55 | void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) { |
---|
56 | #ifndef UNDERLAY_OMNET |
---|
57 | if( delay == 0 ) directScheduler.insert( event, delay ); |
---|
58 | else delayScheduler.insert ( event, delay ); |
---|
59 | #else |
---|
60 | Enter_Method_Silent(); |
---|
61 | cMessage* msg = new cMessage(); |
---|
62 | msg->setContextPointer( new SystemEvent(event) ); |
---|
63 | |
---|
64 | if( delay == 0 ) |
---|
65 | cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg ); |
---|
66 | else |
---|
67 | cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg ); |
---|
68 | #endif |
---|
69 | } |
---|
70 | |
---|
71 | #ifdef UNDERLAY_OMNET |
---|
72 | void SystemQueue::handleMessage(cMessage* msg){ |
---|
73 | SystemEvent* event = (SystemEvent*)msg->contextPointer(); |
---|
74 | |
---|
75 | event->listener->handleSystemEvent( *event ); |
---|
76 | |
---|
77 | delete event; delete msg; |
---|
78 | } |
---|
79 | #endif |
---|
80 | |
---|
81 | void SystemQueue::run() { |
---|
82 | #ifndef UNDERLAY_OMNET |
---|
83 | systemQueueRunning = true; |
---|
84 | directScheduler.run(); |
---|
85 | delayScheduler.run(); |
---|
86 | #endif |
---|
87 | } |
---|
88 | |
---|
89 | void SystemQueue::cancel() { |
---|
90 | #ifndef UNDERLAY_OMNET |
---|
91 | systemQueueRunning = false; |
---|
92 | directScheduler.cancel(); |
---|
93 | delayScheduler.cancel(); |
---|
94 | #endif |
---|
95 | } |
---|
96 | |
---|
97 | bool SystemQueue::isEmpty() { |
---|
98 | #ifndef UNDERLAY_OMNET |
---|
99 | return directScheduler.isEmpty() || delayScheduler.isEmpty(); |
---|
100 | #else |
---|
101 | return false; |
---|
102 | #endif |
---|
103 | } |
---|
104 | |
---|
105 | bool SystemQueue::isRunning() { |
---|
106 | #ifndef UNDERLAY_OMNET |
---|
107 | return systemQueueRunning; |
---|
108 | #else |
---|
109 | return true; |
---|
110 | #endif |
---|
111 | } |
---|
112 | |
---|
113 | void SystemQueue::enterMethod(){ |
---|
114 | // TODO: omnet case and delay scheduler |
---|
115 | directScheduler.enter(); |
---|
116 | } |
---|
117 | |
---|
118 | void SystemQueue::leaveMethod(){ |
---|
119 | // TODO: omnet case and delay scheduler |
---|
120 | directScheduler.leave(); |
---|
121 | } |
---|
122 | |
---|
123 | //*************************************************************** |
---|
124 | #ifndef UNDERLAY_OMNET |
---|
125 | |
---|
126 | SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue) |
---|
127 | : running( false ), transferQueue( _transferQueue ) { |
---|
128 | } |
---|
129 | |
---|
130 | SystemQueue::QueueThread::~QueueThread(){ |
---|
131 | } |
---|
132 | |
---|
133 | void SystemQueue::QueueThread::run(){ |
---|
134 | running = true; |
---|
135 | queueThread = new boost::thread( |
---|
136 | boost::bind(&QueueThread::threadFunc, this) ); |
---|
137 | } |
---|
138 | |
---|
139 | void SystemQueue::QueueThread::cancel(){ |
---|
140 | |
---|
141 | // cause the thread to exit |
---|
142 | { |
---|
143 | boost::mutex::scoped_lock lock(queueMutex); |
---|
144 | running = false; |
---|
145 | |
---|
146 | while( eventsQueue.size() > 0 ){ |
---|
147 | eventsQueue.erase( eventsQueue.begin() ); |
---|
148 | } |
---|
149 | |
---|
150 | itemsAvailable.notify_all(); |
---|
151 | } |
---|
152 | |
---|
153 | // wait that the thread has exited |
---|
154 | // and delete it, so that a subsuquent run() can be called |
---|
155 | |
---|
156 | queueThread->join(); |
---|
157 | delete queueThread; |
---|
158 | queueThread = NULL; |
---|
159 | } |
---|
160 | |
---|
161 | bool SystemQueue::QueueThread::isEmpty(){ |
---|
162 | boost::mutex::scoped_lock lock( queueMutex ); |
---|
163 | return eventsQueue.empty(); |
---|
164 | } |
---|
165 | |
---|
166 | void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){ |
---|
167 | |
---|
168 | // if this is called from a module that is currently handling |
---|
169 | // a thread (called from SystemQueue::onNextQueueItem), the |
---|
170 | // thread is the same anyway and the mutex will be already |
---|
171 | // aquired, otherwise we aquire it now |
---|
172 | |
---|
173 | boost::mutex::scoped_lock lock( queueMutex ); |
---|
174 | |
---|
175 | eventsQueue.push_back( event ); |
---|
176 | eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time(); |
---|
177 | eventsQueue.back().delayTime = delay; |
---|
178 | eventsQueue.back().remainingDelay = delay; |
---|
179 | |
---|
180 | onItemInserted( event ); |
---|
181 | itemsAvailable.notify_all(); |
---|
182 | } |
---|
183 | |
---|
184 | void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) { |
---|
185 | |
---|
186 | boost::mutex::scoped_lock lock( obj->queueMutex ); |
---|
187 | |
---|
188 | while( obj->running ) { |
---|
189 | |
---|
190 | // wait until an item is in the queue or we are notified |
---|
191 | // to quit the thread. in case the thread is about to |
---|
192 | // quit, the queueThreadRunning variable will indicate |
---|
193 | // this and cause the thread to exit |
---|
194 | |
---|
195 | while ( obj->eventsQueue.empty() && obj->running ){ |
---|
196 | |
---|
197 | // const boost::system_time duration = |
---|
198 | // boost::get_system_time() + |
---|
199 | // boost::posix_time::milliseconds(100); |
---|
200 | |
---|
201 | obj->itemsAvailable.wait( lock ); |
---|
202 | // obj->itemsAvailable.timed_wait( lock, duration ); |
---|
203 | } |
---|
204 | |
---|
205 | // |
---|
206 | // work all the items that are currently in the queue |
---|
207 | // |
---|
208 | |
---|
209 | while( !obj->eventsQueue.empty() && obj->running ) { |
---|
210 | |
---|
211 | // fetch the first item in the queue |
---|
212 | // and deliver it to the queue handler |
---|
213 | SystemEvent ev = obj->eventsQueue.front(); |
---|
214 | obj->eventsQueue.erase( obj->eventsQueue.begin() ); |
---|
215 | |
---|
216 | // call the queue and this will |
---|
217 | // call the actual event handler |
---|
218 | obj->onNextQueueItem( ev ); |
---|
219 | |
---|
220 | } // !obj->eventsQueue.empty() ) |
---|
221 | } // while (obj->running) |
---|
222 | |
---|
223 | logging_debug("system queue exited"); |
---|
224 | } |
---|
225 | |
---|
226 | void SystemQueue::QueueThread::enter(){ |
---|
227 | queueMutex.lock(); |
---|
228 | } |
---|
229 | |
---|
230 | void SystemQueue::QueueThread::leave(){ |
---|
231 | queueMutex.leave(); |
---|
232 | } |
---|
233 | |
---|
234 | |
---|
235 | //*************************************************************** |
---|
236 | |
---|
237 | SystemQueue::QueueThreadDirect::QueueThreadDirect(){ |
---|
238 | } |
---|
239 | |
---|
240 | SystemQueue::QueueThreadDirect::~QueueThreadDirect(){ |
---|
241 | } |
---|
242 | |
---|
243 | void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){ |
---|
244 | // do nothing here |
---|
245 | } |
---|
246 | |
---|
247 | void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){ |
---|
248 | // directly deliver the item to the |
---|
249 | event.getListener()->handleSystemEvent( event ); |
---|
250 | } |
---|
251 | |
---|
252 | //*************************************************************** |
---|
253 | |
---|
254 | SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue) |
---|
255 | : QueueThread( _transferQueue ), isSleeping( false ) { |
---|
256 | |
---|
257 | assert( _transferQueue != NULL ); |
---|
258 | } |
---|
259 | |
---|
260 | SystemQueue::QueueThreadDelay::~QueueThreadDelay(){ |
---|
261 | } |
---|
262 | |
---|
263 | void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){ |
---|
264 | |
---|
265 | if( !isSleeping) return; |
---|
266 | |
---|
267 | // break an existing sleep and |
---|
268 | // remember the time that was actually slept for |
---|
269 | // and change it for every event in the queue |
---|
270 | |
---|
271 | assert( !eventsQueue.empty()); |
---|
272 | sleepCond.notify_all(); |
---|
273 | |
---|
274 | ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); |
---|
275 | boost::posix_time::time_duration duration = sleepEnd - sleepStart; |
---|
276 | uint32_t sleptTime = duration.total_milliseconds(); |
---|
277 | |
---|
278 | EventQueue::iterator i = eventsQueue.begin(); |
---|
279 | EventQueue::iterator iend = eventsQueue.end(); |
---|
280 | |
---|
281 | for( ; i != iend; i++ ) { |
---|
282 | |
---|
283 | if( sleptTime >= i->remainingDelay) |
---|
284 | i->remainingDelay = 0; |
---|
285 | else |
---|
286 | i->remainingDelay -= sleptTime; |
---|
287 | |
---|
288 | } // for( ; i != iend; i++ ) |
---|
289 | |
---|
290 | // now we have to reorder the events |
---|
291 | // in the queue with respect to their remaining delay |
---|
292 | // the SystemQueue::operator< takes care of the |
---|
293 | // ordering with respect to the remaining delay |
---|
294 | |
---|
295 | std::sort( eventsQueue.begin(), eventsQueue.end() ); |
---|
296 | |
---|
297 | } |
---|
298 | |
---|
299 | void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){ |
---|
300 | |
---|
301 | // sleeps will be cancelled in the |
---|
302 | // onItemInserted function when a new |
---|
303 | // event arrives during sleeping |
---|
304 | |
---|
305 | assert( !isSleeping ); |
---|
306 | |
---|
307 | // the given item is the one with the least |
---|
308 | // amount of sleep time left. because all |
---|
309 | // items are reordered in onItemInserted |
---|
310 | |
---|
311 | if( event.remainingDelay > 0 ) { |
---|
312 | |
---|
313 | const boost::system_time duration = |
---|
314 | boost::get_system_time() + |
---|
315 | boost::posix_time::milliseconds(event.remainingDelay); |
---|
316 | |
---|
317 | { |
---|
318 | boost::unique_lock<boost::mutex> lock( sleepMutex ); |
---|
319 | |
---|
320 | sleepStart = boost::posix_time::microsec_clock::local_time(); |
---|
321 | isSleeping = true; |
---|
322 | |
---|
323 | sleepCond.timed_wait( lock, duration ); |
---|
324 | |
---|
325 | isSleeping = false; |
---|
326 | } |
---|
327 | |
---|
328 | } // if( event.remainingDelay > 0 ) |
---|
329 | |
---|
330 | // if the sleep succeeded and was not |
---|
331 | // interrupted by a new incoming item |
---|
332 | // we can now deliver this event |
---|
333 | |
---|
334 | ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); |
---|
335 | boost::posix_time::time_duration duration = sleepEnd - sleepStart; |
---|
336 | uint32_t sleptTime = duration.total_milliseconds(); |
---|
337 | |
---|
338 | if (event.remainingDelay <= sleptTime) |
---|
339 | transferQueue->insert( event, 0 ); |
---|
340 | } |
---|
341 | |
---|
342 | #endif // #ifndef UNDERLAY_OMNET |
---|
343 | |
---|
344 | //*************************************************************** |
---|
345 | |
---|
346 | }} // spovnet, common |
---|