1 | // [Licence] |
---|
2 | // The Ariba-Underlay Copyright |
---|
3 | // |
---|
4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH) |
---|
5 | // |
---|
6 | // Institute of Telematics |
---|
7 | // UniversitÀt Karlsruhe (TH) |
---|
8 | // Zirkel 2, 76128 Karlsruhe |
---|
9 | // Germany |
---|
10 | // |
---|
11 | // Redistribution and use in source and binary forms, with or without |
---|
12 | // modification, are permitted provided that the following conditions are |
---|
13 | // met: |
---|
14 | // |
---|
15 | // 1. Redistributions of source code must retain the above copyright |
---|
16 | // notice, this list of conditions and the following disclaimer. |
---|
17 | // 2. Redistributions in binary form must reproduce the above copyright |
---|
18 | // notice, this list of conditions and the following disclaimer in the |
---|
19 | // documentation and/or other materials provided with the distribution. |
---|
20 | // |
---|
21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND |
---|
22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
---|
23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
---|
24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR |
---|
25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
---|
26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
---|
27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
---|
28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
---|
29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
---|
30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
32 | // |
---|
33 | // The views and conclusions contained in the software and documentation |
---|
34 | // are those of the authors and should not be interpreted as representing |
---|
35 | // official policies, either expressed or implied, of the Institute of |
---|
36 | // Telematics. |
---|
37 | |
---|
38 | #include "SystemQueue.h" |
---|
39 | |
---|
40 | namespace ariba { |
---|
41 | namespace utility { |
---|
42 | |
---|
43 | use_logging_cpp(SystemQueue); |
---|
44 | |
---|
45 | SystemQueue::SystemQueue() |
---|
46 | #ifndef UNDERLAY_OMNET |
---|
47 | : delayScheduler( &directScheduler ), systemQueueRunning( false ) |
---|
48 | #endif |
---|
49 | { |
---|
50 | } |
---|
51 | |
---|
52 | SystemQueue::~SystemQueue() { |
---|
53 | } |
---|
54 | |
---|
55 | void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) { |
---|
56 | #ifndef UNDERLAY_OMNET |
---|
57 | if( delay == 0 ) directScheduler.insert( event, delay ); |
---|
58 | else delayScheduler.insert ( event, delay ); |
---|
59 | #else |
---|
60 | Enter_Method_Silent(); |
---|
61 | cMessage* msg = new cMessage(); |
---|
62 | msg->setContextPointer( new SystemEvent(event) ); |
---|
63 | |
---|
64 | if( delay == 0 ) |
---|
65 | cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg ); |
---|
66 | else |
---|
67 | cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg ); |
---|
68 | #endif |
---|
69 | } |
---|
70 | |
---|
71 | #ifdef UNDERLAY_OMNET |
---|
72 | void SystemQueue::handleMessage(cMessage* msg){ |
---|
73 | SystemEvent* event = (SystemEvent*)msg->contextPointer(); |
---|
74 | |
---|
75 | event->listener->handleSystemEvent( *event ); |
---|
76 | |
---|
77 | delete event; delete msg; |
---|
78 | } |
---|
79 | #endif |
---|
80 | |
---|
81 | void SystemQueue::run() { |
---|
82 | #ifndef UNDERLAY_OMNET |
---|
83 | systemQueueRunning = true; |
---|
84 | directScheduler.run(); |
---|
85 | delayScheduler.run(); |
---|
86 | #endif |
---|
87 | } |
---|
88 | |
---|
89 | void SystemQueue::cancel() { |
---|
90 | #ifndef UNDERLAY_OMNET |
---|
91 | systemQueueRunning = false; |
---|
92 | directScheduler.cancel(); |
---|
93 | delayScheduler.cancel(); |
---|
94 | #endif |
---|
95 | } |
---|
96 | |
---|
97 | bool SystemQueue::isEmpty() { |
---|
98 | #ifndef UNDERLAY_OMNET |
---|
99 | return directScheduler.isEmpty() || delayScheduler.isEmpty(); |
---|
100 | #else |
---|
101 | return false; |
---|
102 | #endif |
---|
103 | } |
---|
104 | |
---|
105 | bool SystemQueue::isRunning() { |
---|
106 | #ifndef UNDERLAY_OMNET |
---|
107 | return systemQueueRunning; |
---|
108 | #else |
---|
109 | return true; |
---|
110 | #endif |
---|
111 | } |
---|
112 | |
---|
113 | //*************************************************************** |
---|
114 | #ifndef UNDERLAY_OMNET |
---|
115 | |
---|
116 | SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue) |
---|
117 | : running( false ), transferQueue( _transferQueue ) { |
---|
118 | } |
---|
119 | |
---|
120 | SystemQueue::QueueThread::~QueueThread(){ |
---|
121 | } |
---|
122 | |
---|
123 | void SystemQueue::QueueThread::run(){ |
---|
124 | running = true; |
---|
125 | queueThread = new boost::thread( |
---|
126 | boost::bind(&QueueThread::threadFunc, this) ); |
---|
127 | } |
---|
128 | |
---|
129 | void SystemQueue::QueueThread::cancel(){ |
---|
130 | |
---|
131 | // cause the thread to exit |
---|
132 | running = false; |
---|
133 | itemsAvailable.notify_all(); |
---|
134 | |
---|
135 | // wait that the thread has exited |
---|
136 | // and delete it, so that a subsuquent run() can be called |
---|
137 | |
---|
138 | queueThread->join(); |
---|
139 | delete queueThread; |
---|
140 | queueThread = NULL; |
---|
141 | |
---|
142 | while( eventsQueue.size() > 0 ){ |
---|
143 | eventsQueue.erase( eventsQueue.begin() ); |
---|
144 | } |
---|
145 | } |
---|
146 | |
---|
147 | bool SystemQueue::QueueThread::isEmpty(){ |
---|
148 | boost::mutex::scoped_lock lock( queueMutex ); |
---|
149 | return eventsQueue.empty(); |
---|
150 | } |
---|
151 | |
---|
152 | void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){ |
---|
153 | { |
---|
154 | boost::mutex::scoped_lock lock( queueMutex ); |
---|
155 | |
---|
156 | eventsQueue.push_back( event ); |
---|
157 | eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time(); |
---|
158 | eventsQueue.back().delayTime = delay; |
---|
159 | eventsQueue.back().remainingDelay = delay; |
---|
160 | |
---|
161 | onItemInserted( event ); |
---|
162 | itemsAvailable.notify_all(); |
---|
163 | } |
---|
164 | } |
---|
165 | |
---|
166 | void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) { |
---|
167 | |
---|
168 | boost::mutex::scoped_lock lock( obj->queueMutex ); |
---|
169 | |
---|
170 | while( obj->running ) { |
---|
171 | |
---|
172 | // wait until an item is in the queue or we are notified |
---|
173 | // to quit the thread. in case the thread is about to |
---|
174 | // quit, the queueThreadRunning variable will indicate |
---|
175 | // this and cause the thread to exit |
---|
176 | |
---|
177 | while ( obj->eventsQueue.empty() && obj->running ){ |
---|
178 | |
---|
179 | const boost::system_time duration = |
---|
180 | boost::get_system_time() + |
---|
181 | boost::posix_time::milliseconds(40); |
---|
182 | |
---|
183 | obj->itemsAvailable.wait( lock ); |
---|
184 | // obj->itemsAvailable.timed_wait( lock, duration ); |
---|
185 | } |
---|
186 | |
---|
187 | // work all the items that are currently in the queue |
---|
188 | |
---|
189 | while( !obj->eventsQueue.empty() ) { |
---|
190 | |
---|
191 | // fetch the first item in the queue and deliver it to the |
---|
192 | // queue handler |
---|
193 | SystemEvent ev = obj->eventsQueue.front(); |
---|
194 | obj->eventsQueue.erase( obj->eventsQueue.begin() ); |
---|
195 | |
---|
196 | { |
---|
197 | // unlock the queue to that an event handler |
---|
198 | // can insert new items into the queue |
---|
199 | // obj->queueMutex.unlock(); |
---|
200 | obj->onNextQueueItem( ev ); |
---|
201 | // obj->queueMutex.lock(); |
---|
202 | } |
---|
203 | |
---|
204 | } // !obj->eventsQueue.empty() ) |
---|
205 | } // while (obj->running) |
---|
206 | } |
---|
207 | |
---|
208 | //*************************************************************** |
---|
209 | |
---|
210 | SystemQueue::QueueThreadDirect::QueueThreadDirect(){ |
---|
211 | } |
---|
212 | |
---|
213 | SystemQueue::QueueThreadDirect::~QueueThreadDirect(){ |
---|
214 | } |
---|
215 | |
---|
216 | void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){ |
---|
217 | // do nothing here |
---|
218 | } |
---|
219 | |
---|
220 | void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){ |
---|
221 | // directly deliver the item to the |
---|
222 | event.getListener()->handleSystemEvent( event ); |
---|
223 | } |
---|
224 | |
---|
225 | //*************************************************************** |
---|
226 | |
---|
227 | SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue) |
---|
228 | : QueueThread( _transferQueue ), isSleeping( false ) { |
---|
229 | |
---|
230 | assert( _transferQueue != NULL ); |
---|
231 | } |
---|
232 | |
---|
233 | SystemQueue::QueueThreadDelay::~QueueThreadDelay(){ |
---|
234 | } |
---|
235 | |
---|
236 | void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){ |
---|
237 | |
---|
238 | if( !isSleeping) return; |
---|
239 | |
---|
240 | // break an existing sleep and |
---|
241 | // remember the time that was actually slept for |
---|
242 | // and change it for every event in the queue |
---|
243 | |
---|
244 | assert( !eventsQueue.empty()); |
---|
245 | sleepCond.notify_all(); |
---|
246 | |
---|
247 | ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); |
---|
248 | boost::posix_time::time_duration duration = sleepEnd - sleepStart; |
---|
249 | uint32_t sleptTime = duration.total_milliseconds(); |
---|
250 | |
---|
251 | EventQueue::iterator i = eventsQueue.begin(); |
---|
252 | EventQueue::iterator iend = eventsQueue.end(); |
---|
253 | |
---|
254 | for( ; i != iend; i++ ) { |
---|
255 | |
---|
256 | if( sleptTime >= i->remainingDelay) |
---|
257 | i->remainingDelay = 0; |
---|
258 | else |
---|
259 | i->remainingDelay -= sleptTime; |
---|
260 | |
---|
261 | } // for( ; i != iend; i++ ) |
---|
262 | |
---|
263 | // now we have to reorder the events |
---|
264 | // in the queue with respect to their remaining delay |
---|
265 | // the SystemQueue::operator< takes care of the |
---|
266 | // ordering with respect to the remaining delay |
---|
267 | |
---|
268 | std::sort( eventsQueue.begin(), eventsQueue.end() ); |
---|
269 | |
---|
270 | } |
---|
271 | |
---|
272 | void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){ |
---|
273 | |
---|
274 | // sleeps will be cancelled in the |
---|
275 | // onItemInserted function when a new |
---|
276 | // event arrives during sleeping |
---|
277 | |
---|
278 | assert( !isSleeping ); |
---|
279 | |
---|
280 | // the given item is the one with the least |
---|
281 | // amount of sleep time left. because all |
---|
282 | // items are reordered in onItemInserted |
---|
283 | |
---|
284 | if( event.remainingDelay > 0 ) { |
---|
285 | |
---|
286 | const boost::system_time duration = |
---|
287 | boost::get_system_time() + |
---|
288 | boost::posix_time::milliseconds(event.remainingDelay); |
---|
289 | |
---|
290 | { |
---|
291 | boost::unique_lock<boost::mutex> lock( sleepMutex ); |
---|
292 | |
---|
293 | sleepStart = boost::posix_time::microsec_clock::local_time(); |
---|
294 | isSleeping = true; |
---|
295 | |
---|
296 | sleepCond.timed_wait( lock, duration ); |
---|
297 | |
---|
298 | isSleeping = false; |
---|
299 | } |
---|
300 | |
---|
301 | } // if( event.remainingDelay > 0 ) |
---|
302 | |
---|
303 | // if the sleep succeeded and was not |
---|
304 | // interrupted by a new incoming item |
---|
305 | // we can now deliver this event |
---|
306 | |
---|
307 | ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); |
---|
308 | boost::posix_time::time_duration duration = sleepEnd - sleepStart; |
---|
309 | uint32_t sleptTime = duration.total_milliseconds(); |
---|
310 | |
---|
311 | if (event.remainingDelay <= sleptTime) |
---|
312 | transferQueue->insert( event, 0 ); |
---|
313 | } |
---|
314 | |
---|
315 | #endif // #ifndef UNDERLAY_OMNET |
---|
316 | |
---|
317 | //*************************************************************** |
---|
318 | |
---|
319 | }} // spovnet, common |
---|