Changeset 12763
- Timestamp:
- Mar 13, 2014, 7:30:17 PM (11 years ago)
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/utility/system/SystemEvent.h
r12761 r12763 86 86 } 87 87 88 // FIXME AKTUELL MARIO: copy deadline (etc?) !!! 88 89 inline SystemEvent(const SystemEvent& copy) { 89 90 this->scheduledTime = copy.scheduledTime; … … 133 134 } 134 135 135 inline bool operator<(const SystemEvent& right) const { 136 return remainingDelay < right.remainingDelay; 136 bool operator<(const SystemEvent& right) const 137 { 138 return this->deadline < right.deadline; 137 139 } 140 141 bool operator>(const SystemEvent& right) const 142 { 143 return this->deadline > right.deadline; 144 } 138 145 139 146 }; -
source/ariba/utility/system/SystemQueue.cpp
r12762 r12763 110 110 // wait till actually completes 111 111 // (should be fast, but the current event is allowed to finish) 112 logging_debug("/// ... joining SysQ thread"); 113 sysq_thread->join(); 112 if ( sysq_thread ) 113 { 114 logging_debug("/// ... joining SysQ thread"); 115 sysq_thread->join(); 116 } 114 117 115 118 // delete thread object … … 154 157 155 158 SystemQueue::QueueThread::QueueThread() : 156 now( not_a_date_time ),157 next_deadline( not_a_date_time ),159 // now( not_a_date_time ), 160 // next_deadline( not_a_date_time ), 158 161 processing_event( false ), 159 162 running( false ), … … 198 201 void SystemQueue::QueueThread::run_immediate_event() 199 202 { 200 // measure execution time201 ptime start_time = get_clock();202 203 203 // get next event and process it 204 204 if ( ! immediateEventsQ.empty() ) … … 206 206 scoped_ptr<SystemEvent> currently_processed_event; 207 207 208 /* dequeue event */ 208 209 // SYNCHRONIZED 209 210 { … … 212 213 this->processing_event = true; 213 214 214 // dequeue first event215 // * dequeue first event * 215 216 currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy 216 217 immediateEventsQ.pop_front(); 217 218 } 218 219 220 /* dispatch event */ 219 221 logging_debug("/// SysQ: dispatching event"); 220 222 223 // measure execution time (1/2) 224 ptime start_time = get_clock(); 225 226 // * dispatch event * 221 227 currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event ); 228 229 // measure execution time (2/2) 230 time_duration execution_time = get_clock() - start_time; 231 232 // DEBUG OUTPUT: warning when execution takes too much time 233 // [ TODO how long is "too much time"? ] 234 if ( execution_time.total_milliseconds() > 50 ) 235 { 236 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete."); 237 } 238 239 /* [ TODO ] 240 * 241 * - we could also measure how long an event has been waiting in the queue before it's dispatched 242 * (in order to detect overload) 243 * 244 * - especially for timed events, the displacement could be calculated 245 * (and, e.g., put in relation with the actual intended sleep time) 246 */ 222 247 } 223 248 224 // measure execution time225 now = get_clock(); // NOTE: this is reused in check_timed_queue();226 time_duration execution_time = now - start_time;227 228 249 this->processing_event = false; 229 230 // DEBUG OUTPUT: warning when execution takes too much time231 // [ TODO how long is "too much time"? ]232 if ( execution_time.total_milliseconds() > 50 )233 {234 logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");235 }236 250 } 237 251 … … 241 255 scoped_lock lock( queue_mutex ); 242 256 243 244 // NOTE: this->now was just set by this->run_immediate_event() 245 246 // reset next deadline 247 this->next_deadline = boost::date_time::not_a_date_time; 248 257 ptime now = get_clock(); 249 258 bool not_expired_events_reached = false; 250 259 … … 252 261 while ( ! timedEventsQ.empty() && ! not_expired_events_reached ) 253 262 { 254 SystemEvent& ev = timedEventsQ.front();255 256 time_duration remaining_sleep_time = ev.deadline - this->now;263 const SystemEvent& ev = timedEventsQ.top(); 264 265 time_duration remaining_sleep_time = ev.deadline - now; 257 266 258 267 // BRANCH: deadline reached … … 261 270 // move to immediateEventsQ 262 271 immediateEventsQ.push_back(ev); 263 timedEventsQ.pop _front();272 timedEventsQ.pop(); 264 273 } 265 274 // BRANCH: deadline not reached 266 275 else 267 276 { 268 // store time for next sleep269 this->next_deadline = ev.deadline;270 271 277 // okay, that's all for now. 272 278 not_expired_events_reached = true; … … 277 283 void SystemQueue::QueueThread::wait_for_next_deadline() 278 284 { 285 // SYNCHRONIZED 286 boost::mutex::scoped_lock lock(queue_mutex); 287 279 288 if ( immediateEventsQ.empty() ) 280 289 { 281 boost::mutex::scoped_lock lock(queue_mutex);282 283 290 // don't sleep when the SystemQueue is not already canceled 284 291 if ( aborted ) … … 287 294 288 295 // BRANCH: no timed events: sleep "forever" (until new events are scheduled) 289 if ( t his->next_deadline.is_not_a_date_time() )296 if ( timedEventsQ.empty() ) 290 297 { 291 298 logging_debug("/// SysQ is going to sleep."); … … 296 303 else 297 304 { 298 logging_debug("/// SysQ is going to sleep for " 299 << ( next_deadline - get_clock() ).total_milliseconds() 300 << "ms."); 301 302 this->system_queue_idle.timed_wait( lock, next_deadline ); 305 logging_debug( "/// SysQ is going to sleep for " 306 << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds() 307 << "ms. Deadline: " 308 << timedEventsQ.top().deadline 309 << ", Clock: " 310 << get_clock() ); 311 312 this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline ); 303 313 } 304 314 } … … 309 319 ptime SystemQueue::QueueThread::get_clock() 310 320 { 311 return microsec_clock::universal_time(); 321 // return microsec_clock::universal_time(); 322 return microsec_clock::local_time(); 312 323 } 313 324 … … 352 363 immediateEventsQ.push_back(event); 353 364 } 365 // BRANCH: timed event 366 else 367 { 368 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay); 369 event.delayTime = delay; // XXX I think this is no longer needed.. 370 371 // XXX debug 372 logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)"); 373 374 timedEventsQ.push(event); 375 376 // TODO push sorted.. (use sorted queue..) 377 // timedEventsQ.pu 378 /* 379 * std::priority_queue 380 * 381 * but it orders high-to-low (must reverse order..) 382 * 383 * ... ah cool, da ist direkt eine anleitung fÃŒr reverse order: 384 * http://www.cplusplus.com/reference/queue/priority_queue/priority_queue/ 385 */ 386 } 354 387 } 355 388 … … 414 447 415 448 // wait until the thread has exited 416 417 449 logging_debug("joining system queue thread"); 450 queueThread->join(); 418 451 419 452 // delete pending events -
source/ariba/utility/system/SystemQueue.h
r12762 r12763 40 40 #define SYSTEMQUEUE_H_ 41 41 42 // #include <vector>43 #include <list>44 #include <cassert>45 42 #include "SystemEvent.h" 46 43 #include "SystemEventListener.h" 47 44 #include "ariba/utility/logging/Logging.h" 45 46 #include <cassert> 47 #include <list> 48 #include <vector> 49 #include <queue> // std::priority_queue 50 #include <functional> // std::greater 51 48 52 #include <boost/date_time.hpp> 49 53 #include <boost/cstdint.hpp> … … 169 173 170 174 typedef list<SystemEvent> EventQueue; 175 typedef std::priority_queue<SystemEvent, 176 std::vector<SystemEvent>, 177 std::greater<SystemEvent> > PriorityEventQueue; 178 // typedef std::priority_queue<SystemEvent> PriorityEventQueue; 179 // TODO is vector the best underlay? 180 171 181 172 182 //******************************************************** … … 205 215 private: 206 216 EventQueue immediateEventsQ; 207 EventQueue timedEventsQ; 208 209 ptime now; 210 ptime next_deadline; 211 217 PriorityEventQueue timedEventsQ; 218 212 219 boost::condition_variable system_queue_idle; 213 220 boost::mutex queue_mutex; -
tests/SystemQueue-tests.cc
r12762 r12763 92 92 void Check() 93 93 { 94 // XXX95 94 cout << "### Check ### "<< endl; 96 95 97 96 checkmark = true; 98 97 } 99 98 99 void Cancel() 100 { 101 cout << "### Cancel ### "<< endl; 102 103 SystemQueue::instance().cancel(); 104 checkmark = true; 105 } 106 100 107 void LongRunner() 101 108 { … … 163 170 164 171 /** 172 * Enqueues an event but then cancels the SystemQueue without running 173 */ 174 TEST_F(SystemQueueTest, EmptyAfterCancel) 175 { 176 SystemQueue& sysq = SystemQueue::instance(); 177 178 EXPECT_TRUE( sysq.isEmpty() ); 179 180 // enqueue event 181 sysq.scheduleCall( 182 boost::bind(&SystemQueueTest::Check, this) 183 ); 184 185 EXPECT_FALSE( sysq.isEmpty() ); 186 187 // cancel 188 sysq.cancel(); 189 190 EXPECT_TRUE( sysq.isEmpty() ); 191 } 192 193 194 /** 195 * cancels the SystemQueue from inside a scheduled event 196 */ 197 TEST_F(SystemQueueTest, CancelFromInsideEvent) 198 { 199 SystemQueue& sysq = SystemQueue::instance(); 200 checkmark = false; // just to be sure.. 201 202 // start 203 sysq.run(); 204 205 // scheduleCall 206 sysq.scheduleCall( 207 boost::bind(&SystemQueueTest::Cancel, this) 208 ); 209 210 // wait for the event to happen 211 wait_for_checkmark(MAX_WAIT); 212 213 EXPECT_FALSE( sysq.isRunning() ) << "SystemQueue has not stopped properly."; 214 EXPECT_TRUE( sysq.isEmpty() ) << "SystemQueue has not stopped properly."; 215 } 216 217 218 /** 165 219 * schedule a call and test whether it is actually performed by the SystemQueue 166 220 */
Note:
See TracChangeset
for help on using the changeset viewer.