Changeset 12762


Ignore:
Timestamp:
Mar 12, 2014, 6:58:54 PM (11 years ago)
Author:
hock@…
Message:

New System Queue

Threading test (aka "isEmpty() test") is now passed, too.

Files:
3 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/system/SystemQueue.cpp

    r12761 r12762  
    4343namespace utility {
    4444   
     45    typedef boost::mutex::scoped_lock scoped_lock;
    4546   
    4647    using boost::posix_time::microsec_clock;
    4748    using boost::posix_time::time_duration;
    48 //     using boost::mutex::scoped_lock;
    49     typedef boost::mutex::scoped_lock scoped_lock;
    5049    using boost::date_time::not_a_date_time;
    5150    using boost::scoped_ptr;
     
    5453use_logging_cpp(SystemQueue);
    5554
    56 SystemQueue::SystemQueue()
    57 {
    58         logging_info("Creating SystemQueue at: " << this);
     55SystemQueue::SystemQueue() :
     56    SysQ( new QueueThread() )
     57{
     58        logging_debug("Creating SystemQueue at: " << this);
    5959}
    6060
     
    7676    SystemEvent ev(event);
    7777   
    78     SysQ.insert(ev, delay);
     78    SysQ->insert(ev, delay);
    7979}
    8080
     
    9595void SystemQueue::run()
    9696{
    97     assert ( ! SysQ.running );
    98    
    99     SysQ.running = true;
     97    assert ( ! SysQ->running );
     98   
     99    SysQ->running = true;
    100100   
    101101    // start thread
    102     sysq_thread.reset( new boost::thread(boost::ref(SysQ)) );
     102    sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) );
    103103}
    104104
     
    106106{
    107107    // signal SysQ to quit (and abort queued events)
    108     SysQ.cancel();
     108    SysQ->cancel();
    109109   
    110110    // wait till actually completes
    111111    //   (should be fast, but the current event is allowed to finish)
    112     logging_info("/// joining system queue thread");
     112    logging_debug("/// ... joining SysQ thread");
    113113    sysq_thread->join();
    114114   
     
    116116    sysq_thread.reset();
    117117   
    118     assert ( ! SysQ.isRunning() );
    119    
    120    
    121     // TODO FIXME gtest reuses the singleton... :-/
    122     //   maybe delete the SysQ object here..? (scoped_ptr...)
    123     SysQ.aborted = false;  // XXX hotfix..
     118    assert ( ! SysQ->isRunning() );
     119
     120   
     121    // clean up and respawn
     122    logging_debug("/// respawning SysQ");
     123    SysQ.reset( new QueueThread() );
    124124}
    125125
     
    133133bool SystemQueue::isEmpty()
    134134{
    135 //  XXX
    136 //      return directScheduler.isEmpty() || delayScheduler.isEmpty();
    137 
    138     return true;
     135    return SysQ->isEmpty();
    139136}
    140137
    141138bool SystemQueue::isRunning()
    142139{
    143     return SysQ.isRunning();
     140    return SysQ->isRunning();
    144141}
    145142
     
    159156    now( not_a_date_time ),
    160157    next_deadline( not_a_date_time ),
     158    processing_event( false ),
    161159    running( false ),
    162160    aborted( false )
     
    170168void SystemQueue::QueueThread::operator()()
    171169{
    172     // XXX debug
    173     logging_info( "/// SysQ thread is alive." );
     170    logging_debug( "/// SysQ thread is alive." );
    174171   
    175172    assert( running );  // this is set before the thread starts
     
    188185    }
    189186   
    190     // XXX debug
    191     logging_info( "/// SysQ thread is quitting." );
     187    logging_debug( "/// SysQ thread is quitting." );
    192188   
    193189    running = false;
     
    213209        {
    214210            scoped_lock lock( queue_mutex );
     211           
     212            this->processing_event = true;
    215213           
    216214            // dequeue first event
     
    219217        }
    220218       
    221         // XXX debug
    222         logging_info("/// SysQ: dispatching event");
     219        logging_debug("/// SysQ: dispatching event");
    223220       
    224221        currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
     
    228225    now = get_clock();   // NOTE: this is reused in check_timed_queue();
    229226    time_duration execution_time = now - start_time;
     227   
     228    this->processing_event = false;
    230229   
    231230    // DEBUG OUTPUT: warning when execution takes too much time
     
    233232    if ( execution_time.total_milliseconds() > 50 )
    234233    {
    235         logging_debug("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
     234        logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
    236235    }
    237236}
     
    290289        if ( this->next_deadline.is_not_a_date_time() )
    291290        {
    292             // XXX debug output
    293             logging_info("/// SysQ is going to sleep (forever..).");
     291            logging_debug("/// SysQ is going to sleep.");
    294292           
    295293            this->system_queue_idle.wait( lock );
     
    298296        else
    299297        {
    300             // XXX debug output
    301             time_duration sleep_time = next_deadline - get_clock();
    302             logging_info("/// SysQ is going to sleep for " << sleep_time.total_milliseconds() << "ms.");
     298            logging_debug("/// SysQ is going to sleep for "
     299                        << ( next_deadline - get_clock() ).total_milliseconds()
     300                        << "ms.");
    303301
    304302            this->system_queue_idle.timed_wait( lock, next_deadline );
     
    326324void SystemQueue::QueueThread::cancel()
    327325{
    328     logging_debug("/// cancelling system queue");
     326    logging_debug("/// Cancelling system queue... ");
    329327
    330328    // SYNCHRONIZED
     
    334332    }
    335333   
    336     // XXX debug output
    337     logging_info("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
     334    logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + "
    338335                              << timedEventsQ.size() << " timed event(s) left.");
    339336   
     
    360357    system_queue_idle.notify_one();  // NOTE: there is only one thread
    361358                                     // (so it doesn't matter whether to call notify_one, or notify_all)
     359}
     360
     361
     362bool SystemQueue::QueueThread::isEmpty()
     363{
     364    // SYNCHRONIZED
     365    scoped_lock lock( queue_mutex );
     366   
     367    return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event;
    362368}
    363369
  • source/ariba/utility/system/SystemQueue.h

    r12761 r12762  
    206206        EventQueue immediateEventsQ;
    207207        EventQueue timedEventsQ;
    208 //      boost::mutex queueMutex;
    209        
    210 //         SystemEvent& currently_processed_event;
    211208       
    212209        ptime now;
     
    215212        boost::condition_variable system_queue_idle;
    216213        boost::mutex queue_mutex;
    217        
    218 //              boost::condition_variable itemsAvailable;  DEPRECATED
     214
     215        bool processing_event;
     216       
    219217                volatile bool running;
    220218        volatile bool aborted;
     
    244242    /// member variables of class SystemQueue
    245243private:
    246     QueueThread SysQ;
     244    boost::scoped_ptr<QueueThread> SysQ;
    247245    boost::scoped_ptr<boost::thread> sysq_thread;
    248246   
  • tests/SystemQueue-tests.cc

    r12761 r12762  
    201201 *    instead a wait_until_empty function.
    202202 */
    203 TEST_F(SystemQueueTest, DISABLED_Threading)
     203TEST_F(SystemQueueTest, Threading)
    204204{
    205205    SystemQueue& sysq = SystemQueue::instance();
Note: See TracChangeset for help on using the changeset viewer.