Changeset 12764 for source/ariba/utility
- Timestamp:
- Mar 14, 2014, 7:58:45 PM (11 years ago)
- Location:
- source/ariba/utility/system
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/utility/system/SystemEvent.h
r12763 r12764 71 71 inline SystemEvent(SystemEventListener* mlistener, SystemEventType mtype = 72 72 SystemEventType::DEFAULT, void* mdata = NULL) : 73 listener(mlistener), type(mtype), data(mdata), scheduledTime( 74 boost::posix_time::not_a_date_time), delayTime(0), 75 remainingDelay(0) 76 73 listener(mlistener), 74 type(mtype), 75 data(mdata), 76 scheduledTime(boost::posix_time::not_a_date_time), 77 deadline(boost::posix_time::not_a_date_time), 78 delayTime(0), 79 remainingDelay(0) 77 80 { 78 81 } … … 81 84 inline SystemEvent(SystemEventListener* mlistener, SystemEventType mtype = 82 85 SystemEventType::DEFAULT, T* mdata = NULL) : 83 listener(mlistener), type(mtype), data((void*) mdata), scheduledTime( 84 boost::posix_time::not_a_date_time), delayTime(0), 85 remainingDelay(0) { 86 listener(mlistener), 87 type(mtype), 88 data((void*) mdata), 89 scheduledTime(boost::posix_time::not_a_date_time), 90 deadline(boost::posix_time::not_a_date_time), 91 delayTime(0), 92 remainingDelay(0) { 86 93 } 87 94 88 // FIXME AKTUELL MARIO: copy deadline (etc?) !!!89 95 inline SystemEvent(const SystemEvent& copy) { 90 96 this->scheduledTime = copy.scheduledTime; 97 this->deadline = copy.deadline; 91 98 this->delayTime = copy.delayTime; 92 99 this->remainingDelay = copy.remainingDelay; … … 98 105 inline void operator=(const SystemEvent& right) { 99 106 this->scheduledTime = right.scheduledTime; 107 this->deadline = right.deadline; 100 108 this->delayTime = right.delayTime; 101 109 this->remainingDelay = right.remainingDelay; -
source/ariba/utility/system/SystemQueue.cpp
r12763 r12764 40 40 #include <stdexcept> 41 41 42 // TODO Mario: 43 // check if there is any debug out left to remove 44 42 45 namespace ariba { 43 46 namespace utility { 44 47 45 46 47 48 49 50 48 typedef boost::mutex::scoped_lock scoped_lock; 49 50 using boost::posix_time::microsec_clock; 51 using boost::posix_time::time_duration; 52 using boost::date_time::not_a_date_time; 53 using boost::scoped_ptr; 51 54 52 55 … … 73 76 void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) 74 77 { 78 assert ( SysQ->running ); // should we really enforce this? 79 75 80 // copy 76 81 SystemEvent ev(event); … … 96 101 { 97 102 assert ( ! SysQ->running ); 103 assert ( ! SysQ->unclean ); 98 104 99 105 SysQ->running = true; … … 105 111 void SystemQueue::cancel() 106 112 { 113 // CHECK: this function must not be called from within a SystemQueue-Event 114 if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() ) 115 { 116 logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!"); 117 throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!"); 118 } 119 120 107 121 // signal SysQ to quit (and abort queued events) 108 122 SysQ->cancel(); … … 129 143 void SystemQueue::dropAll( const SystemEventListener* mlistener) 130 144 { 131 // XXX145 // TODO 132 146 // directScheduler.dropAll(mlistener); 133 147 // delayScheduler.dropAll(mlistener); … … 144 158 } 145 159 146 // XXX 147 // void SystemQueue::enterMethod(){ 148 // // TODO: omnet case and delay scheduler 149 // directScheduler.enter(); 150 // } 151 // 152 // void SystemQueue::leaveMethod(){ 153 // // TODO: omnet case and delay scheduler 154 // directScheduler.leave(); 155 // } 156 157 160 161 //******************************************************** 162 163 164 /// constructor 158 165 SystemQueue::QueueThread::QueueThread() : 159 // now( not_a_date_time ),160 // next_deadline( not_a_date_time ),161 166 processing_event( false ), 162 167 running( false ), 163 aborted( false ) 168 aborted( false ), 169 unclean( false ) 164 170 { 165 171 } … … 190 196 logging_debug( "/// SysQ thread is quitting." ); 191 197 198 unclean = true; 192 199 running = false; 193 200 } … … 219 226 220 227 /* dispatch event */ 221 logging_debug("/// SysQ: dispatching event");228 // logging_debug("/// SysQ: dispatching event"); 222 229 223 230 // measure execution time (1/2) … … 319 326 ptime SystemQueue::QueueThread::get_clock() 320 327 { 321 // return microsec_clock::universal_time(); 322 return microsec_clock::local_time(); 328 return microsec_clock::universal_time(); 323 329 } 324 330 … … 367 373 { 368 374 event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay); 369 event.delayTime = delay; // XXX I think this is no longer needed..375 event.delayTime = delay; // ( I think this is no longer needed.. ) 370 376 371 // XXX debug 372 logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");377 // // debug output 378 // logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)"); 373 379 374 380 timedEventsQ.push(event); 375 376 // TODO push sorted.. (use sorted queue..)377 // timedEventsQ.pu378 /*379 * std::priority_queue380 *381 * but it orders high-to-low (must reverse order..)382 *383 * ... ah cool, da ist direkt eine anleitung fÃŒr reverse order:384 * http://www.cplusplus.com/reference/queue/priority_queue/priority_queue/385 */386 381 } 387 382 } … … 415 410 416 411 417 //*************************************************************** 418 //// XXX old SystemQueue subclasses 419 // (needed as reference during development of the replacement) 420 421 // #ifndef UNDERLAY_OMNET 422 #define NOOLDSYSQ 423 #ifndef NOOLDSYSQ 424 425 void SystemQueue::QueueThread::run(){ 426 running = true; 427 428 queueThread = new boost::thread( 429 boost::bind(&QueueThread::threadFunc, this) ); 430 } 431 432 void SystemQueue::QueueThread::cancel(){ 433 434 logging_debug("cancelling system queue"); 435 436 // cause the thread to exit 437 { 438 // get the lock, when we got the lock the 439 // queue thread must be in itemsAvailable.wait() 440 boost::mutex::scoped_lock lock(queueMutex); 441 442 // set the running indicator and signal to run on 443 // this will run the thread and quit it 444 running = false; 445 itemsAvailable.notify_all(); 446 } 447 448 // wait until the thread has exited 449 logging_debug("joining system queue thread"); 450 queueThread->join(); 451 452 // delete pending events 453 logging_debug("deleting pending system queue events"); 454 while( eventsQueue.size() > 0 ){ 455 eventsQueue.erase( eventsQueue.begin() ); 456 } 457 458 // delete the thread, so that a subsuquent run() can be called 459 delete queueThread; 460 queueThread = NULL; 461 } 462 463 bool SystemQueue::QueueThread::isEmpty(){ 464 boost::mutex::scoped_lock lock( queueMutex ); 465 return eventsQueue.empty(); 466 } 467 468 void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){ 469 470 // if this is called from a module that is currently handling 471 // a thread (called from SystemQueue::onNextQueueItem), the 472 // thread is the same anyway and the mutex will be already 473 // aquired, otherwise we aquire it now 474 475 boost::mutex::scoped_lock lock( queueMutex ); 476 477 if ( delay > 0 ) 478 { 479 logging_debug("SystemQueue(" << this << ") : Schedule event in: " << delay << " ms; Events in queue (before insert): " << eventsQueue.size() ); 480 } 481 482 eventsQueue.push_back( event ); 483 eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time(); 484 eventsQueue.back().delayTime = delay; 485 eventsQueue.back().remainingDelay = delay; 486 487 if ( delay > 0 ) 488 { 489 logging_debug("SystemQueue(" << this << ") : Events in queue (after insert): " << eventsQueue.size() ); 490 } 491 492 onItemInserted( event ); 493 itemsAvailable.notify_all(); 494 } 495 496 void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) { 497 boost::mutex::scoped_lock lock( queueMutex ); 498 499 bool deleted; 500 do{ 501 deleted = false; 502 EventQueue::iterator i = eventsQueue.begin(); 503 EventQueue::iterator iend = eventsQueue.end(); 504 505 for( ; i != iend; i++){ 506 if((*i).getListener() == mlistener){ 507 eventsQueue.erase(i); 508 deleted = true; 509 break; 510 } 511 } 512 }while(deleted); 513 } 514 515 void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) { 516 517 boost::mutex::scoped_lock lock( obj->queueMutex ); 518 519 while( obj->running ) { 520 521 // wait until an item is in the queue or we are notified 522 // to quit the thread. in case the thread is about to 523 // quit, the queueThreadRunning variable will indicate 524 // this and cause the thread to exit 525 526 while ( obj->running && obj->eventsQueue.empty() ){ 527 528 // const boost::system_time duration = 529 // boost::get_system_time() + 530 // boost::posix_time::milliseconds(100); 531 // obj->itemsAvailable.timed_wait( lock, duration ); 532 533 obj->itemsAvailable.wait( lock ); 534 } 535 536 // 537 // work all the items that are currently in the queue 538 // 539 540 while( obj->running && (!obj->eventsQueue.empty()) ) { 541 542 // fetch the first item in the queue 543 // and deliver it to the queue handler 544 SystemEvent ev = obj->eventsQueue.front(); 545 546 // XXX debugging the delay-scheduler.. 547 if ( ev.delayTime > 0 ) 548 logging_debug("SystemQueue(" << obj << ") : Events in queue (before execution): " << obj->eventsQueue.size()); 549 550 obj->eventsQueue.erase( obj->eventsQueue.begin() ); 551 552 // call the queue and this will 553 // call the actual event handler 554 obj->queueMutex.unlock(); 555 obj->onNextQueueItem( ev ); 556 obj->queueMutex.lock(); 557 558 // XXX debugging the delay-scheduler.. 559 if ( ev.delayTime > 0 ) 560 logging_debug("SystemQueue(" << obj << ") : Remaining events in queue (after execution): " << obj->eventsQueue.size()); 561 562 } // !obj->eventsQueue.empty() ) 563 } // while (obj->running) 564 565 logging_debug("system queue exited"); 566 } 567 568 void SystemQueue::QueueThread::enter(){ 569 queueMutex.lock(); 570 } 571 572 void SystemQueue::QueueThread::leave(){ 573 queueMutex.unlock(); 574 } 575 576 577 //*************************************************************** 578 579 SystemQueue::QueueThreadDirect::QueueThreadDirect(){ 580 } 581 582 SystemQueue::QueueThreadDirect::~QueueThreadDirect(){ 583 } 584 585 void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){ 586 // do nothing here 587 } 588 589 void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){ 590 // directly deliver the item to the 591 event.getListener()->handleSystemEvent( event ); 592 } 593 594 //*************************************************************** 595 596 SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue) 597 : QueueThread( _transferQueue ), isSleeping( false ) { 598 599 assert( _transferQueue != NULL ); 600 } 601 602 SystemQueue::QueueThreadDelay::~QueueThreadDelay(){ 603 } 604 605 void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){ 606 607 if( !isSleeping) 608 { 609 logging_warn("SystemQueue(" << this << ") : No, I'm not asleep!! New item inserted."); 610 return; // TODO Mario: shouldn't we sort anyway..? 611 } 612 613 // break an existing sleep and 614 // remember the time that was actually slept for 615 // and change it for every event in the queue 616 617 assert( !eventsQueue.empty()); 618 sleepCond.notify_all(); 619 620 ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); 621 boost::posix_time::time_duration duration = sleepEnd - sleepStart; 622 uint32_t sleptTime = duration.total_milliseconds(); 623 624 EventQueue::iterator i = eventsQueue.begin(); 625 EventQueue::iterator iend = eventsQueue.end(); 626 627 logging_debug("SystemQueue(" << this << ") : Adjusting remaining delays:"); 628 629 // TODO Mario: What about the just inserted event..? 630 for( ; i != iend; i++ ) { 631 632 if( sleptTime >= i->remainingDelay) 633 i->remainingDelay = 0; 634 else 635 { 636 i->remainingDelay -= sleptTime; 637 638 // XXX Mario: Testcode, just to find a bug... 639 boost::posix_time::time_duration time_passed = sleepEnd - i->getScheduledTime(); 640 logging_debug("SystemQueue(" << this << ") : Total: " << 641 i->delayTime << ", remainingDelay: " << i->remainingDelay << 642 ", time already passed: " << time_passed.total_milliseconds() ); 643 } 644 645 } // for( ; i != iend; i++ ) 646 647 // now we have to reorder the events 648 // in the queue with respect to their remaining delay 649 // the SystemQueue::operator< takes care of the 650 // ordering with respect to the remaining delay 651 652 std::sort( eventsQueue.begin(), eventsQueue.end() ); 653 654 } 655 656 void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){ 657 658 // sleeps will be cancelled in the 659 // onItemInserted function when a new 660 // event arrives during sleeping 661 662 assert( !isSleeping ); 663 664 // the given item is the one with the least 665 // amount of sleep time left. because all 666 // items are reordered in onItemInserted 667 668 if( event.remainingDelay > 0 ) { 669 670 const boost::system_time duration = 671 boost::get_system_time() + 672 boost::posix_time::milliseconds(event.remainingDelay); 673 674 logging_debug("SystemQueue(" << this << ") : Sleeping for: " << event.remainingDelay << " ms"); 675 676 { 677 boost::unique_lock<boost::mutex> lock( sleepMutex ); 678 679 sleepStart = boost::posix_time::microsec_clock::local_time(); 680 isSleeping = true; 681 682 sleepCond.timed_wait( lock, duration ); 683 684 isSleeping = false; 685 } 686 687 } // if( event.remainingDelay > 0 ) 688 689 // if the sleep succeeded and was not 690 // interrupted by a new incoming item 691 // we can now deliver this event 692 693 ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); 694 boost::posix_time::time_duration duration = sleepEnd - sleepStart; 695 uint32_t sleptTime = duration.total_milliseconds(); 696 697 logging_debug("SystemQueue(" << this << ") : Slept for: " << sleptTime << " ms; until: " << sleepEnd); 698 699 // TODO MARIO: find the bug that loses events... 700 if (event.remainingDelay <= sleptTime) 701 { 702 logging_debug("SystemQueue(" << this << ") : Transferring scheduled event into the direct queue. Scheduled time: " << event.getScheduledTime() ); 703 transferQueue->insert( event, 0 ); 704 } 705 else 706 { 707 logging_warn("SystemQueue(" << this << ") : Scheduled event lost!! :-( (Sleep should have been " << event.remainingDelay - sleptTime << " ms longer..)"); 708 logging_debug("SystemQueue(" << this << ") : Total delay: " << event.delayTime << "; remaining delay: " << event.remainingDelay); 709 710 // throw std::logic_error("Scheduled event lost!! :-("); 711 } 712 } 713 714 #endif // #ifndef UNDERLAY_OMNET 715 716 //*************************************************************** 412 // XXX code from old system queue 413 // void SystemQueue::QueueThread::enter(){ 414 // queueMutex.lock(); 415 // } 416 // 417 // void SystemQueue::QueueThread::leave(){ 418 // queueMutex.unlock(); 419 // } 717 420 718 421 }} // spovnet, common -
source/ariba/utility/system/SystemQueue.h
r12763 r12764 174 174 typedef list<SystemEvent> EventQueue; 175 175 typedef std::priority_queue<SystemEvent, 176 std::vector<SystemEvent>, 176 std::vector<SystemEvent>, // [ TODO is vector the best underlay? ] 177 177 std::greater<SystemEvent> > PriorityEventQueue; 178 // typedef std::priority_queue<SystemEvent> PriorityEventQueue;179 // TODO is vector the best underlay?180 181 178 182 179 //******************************************************** … … 224 221 volatile bool running; 225 222 volatile bool aborted; 223 volatile bool unclean; 226 224 }; // class QueueThread 227 225
Note:
See TracChangeset
for help on using the changeset viewer.