Changeset 12763


Ignore:
Timestamp:
Mar 13, 2014, 7:30:17 PM (11 years ago)
Author:
hock@…
Message:

priority queue (but not tested)

--> FIXME in SystemEvent.h (has to be fixed first!)

Files:
4 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/system/SystemEvent.h

    r12761 r12763  
    8686        }
    8787
     88        // FIXME AKTUELL MARIO: copy deadline (etc?) !!!
    8889        inline SystemEvent(const SystemEvent& copy) {
    8990                this->scheduledTime = copy.scheduledTime;
     
    133134        }
    134135
    135         inline bool operator<(const SystemEvent& right) const {
    136                 return remainingDelay < right.remainingDelay;
     136        bool operator<(const SystemEvent& right) const
     137        {
     138                return this->deadline < right.deadline;
    137139        }
     140
     141    bool operator>(const SystemEvent& right) const
     142    {
     143        return this->deadline > right.deadline;
     144    }
    138145
    139146};
  • source/ariba/utility/system/SystemQueue.cpp

    r12762 r12763  
    110110    // wait till actually completes
    111111    //   (should be fast, but the current event is allowed to finish)
    112     logging_debug("/// ... joining SysQ thread");
    113     sysq_thread->join();
     112    if ( sysq_thread )
     113    {
     114        logging_debug("/// ... joining SysQ thread");
     115        sysq_thread->join();
     116    }
    114117   
    115118    // delete thread object
     
    154157
    155158SystemQueue::QueueThread::QueueThread() :
    156     now( not_a_date_time ),
    157     next_deadline( not_a_date_time ),
     159//     now( not_a_date_time ),
     160//     next_deadline( not_a_date_time ),
    158161    processing_event( false ),
    159162    running( false ),
     
    198201void SystemQueue::QueueThread::run_immediate_event()
    199202{
    200     // measure execution time
    201     ptime start_time = get_clock();
    202    
    203203    // get next event and process it
    204204    if ( ! immediateEventsQ.empty() )
     
    206206        scoped_ptr<SystemEvent> currently_processed_event;
    207207       
     208        /* dequeue event */
    208209        // SYNCHRONIZED
    209210        {
     
    212213            this->processing_event = true;
    213214           
    214             // dequeue first event
     215            // * dequeue first event *
    215216            currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) );   // copy
    216217            immediateEventsQ.pop_front();
    217218        }
    218219       
     220        /* dispatch event */
    219221        logging_debug("/// SysQ: dispatching event");
    220222       
     223        // measure execution time (1/2)
     224        ptime start_time = get_clock();
     225
     226        // * dispatch event *
    221227        currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
     228       
     229        // measure execution time (2/2)
     230        time_duration execution_time = get_clock() - start_time;
     231       
     232        // DEBUG OUTPUT: warning when execution takes too much time
     233        //   [ TODO how long is "too much time"? ]
     234        if ( execution_time.total_milliseconds() > 50 )
     235        {
     236            logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
     237        }
     238
     239        /*  [ TODO ]
     240        *
     241        *  - we could also measure how long an event has been waiting in the queue before it's dispatched
     242        *  (in order to detect overload)
     243        *
     244        *  - especially for timed events, the displacement could be calculated
     245        *  (and, e.g., put in relation with the actual intended sleep time)
     246        */
    222247    }
    223248   
    224     // measure execution time
    225     now = get_clock();   // NOTE: this is reused in check_timed_queue();
    226     time_duration execution_time = now - start_time;
    227    
    228249    this->processing_event = false;
    229    
    230     // DEBUG OUTPUT: warning when execution takes too much time
    231     //   [ TODO how long is "too much time"? ]
    232     if ( execution_time.total_milliseconds() > 50 )
    233     {
    234         logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
    235     }
    236250}
    237251
     
    241255    scoped_lock lock( queue_mutex );
    242256
    243    
    244     // NOTE: this->now was just set by this->run_immediate_event()
    245 
    246     // reset next deadline
    247     this->next_deadline = boost::date_time::not_a_date_time;
    248 
     257    ptime now = get_clock();
    249258    bool not_expired_events_reached = false;
    250259   
     
    252261    while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
    253262    {
    254         SystemEvent& ev = timedEventsQ.front();
    255        
    256         time_duration remaining_sleep_time = ev.deadline - this->now;
     263        const SystemEvent& ev = timedEventsQ.top();
     264       
     265        time_duration remaining_sleep_time = ev.deadline - now;
    257266       
    258267        // BRANCH: deadline reached
     
    261270            // move to immediateEventsQ
    262271            immediateEventsQ.push_back(ev);
    263             timedEventsQ.pop_front();
     272            timedEventsQ.pop();
    264273        }
    265274        // BRANCH: deadline not reached
    266275        else
    267276        {
    268             // store time for next sleep
    269             this->next_deadline = ev.deadline;
    270            
    271277            // okay, that's all for now.
    272278            not_expired_events_reached = true;
     
    277283void SystemQueue::QueueThread::wait_for_next_deadline()
    278284{
     285    // SYNCHRONIZED
     286    boost::mutex::scoped_lock lock(queue_mutex);
     287
    279288    if ( immediateEventsQ.empty() )
    280289    {
    281         boost::mutex::scoped_lock lock(queue_mutex);
    282        
    283290         // don't sleep when the SystemQueue is not already canceled
    284291        if ( aborted )
     
    287294       
    288295        // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
    289         if ( this->next_deadline.is_not_a_date_time() )
     296        if ( timedEventsQ.empty() )
    290297        {
    291298            logging_debug("/// SysQ is going to sleep.");
     
    296303        else
    297304        {
    298             logging_debug("/// SysQ is going to sleep for "
    299                         << ( next_deadline - get_clock() ).total_milliseconds()
    300                         << "ms.");
    301 
    302             this->system_queue_idle.timed_wait( lock, next_deadline );
     305            logging_debug( "/// SysQ is going to sleep for "
     306                        << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
     307                        << "ms. Deadline: "
     308                        << timedEventsQ.top().deadline
     309                        << ", Clock: "
     310                        << get_clock() );
     311
     312            this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
    303313        }
    304314    }
     
    309319ptime SystemQueue::QueueThread::get_clock()
    310320{
    311     return microsec_clock::universal_time();
     321//     return microsec_clock::universal_time();
     322    return microsec_clock::local_time();
    312323}
    313324
     
    352363            immediateEventsQ.push_back(event);
    353364        }
     365        // BRANCH: timed event
     366        else
     367        {
     368            event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
     369            event.delayTime = delay;  // XXX I think this is no longer needed..
     370           
     371            // XXX debug
     372            logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
     373           
     374            timedEventsQ.push(event);
     375           
     376            // TODO push sorted.. (use sorted queue..)
     377//             timedEventsQ.pu
     378            /*
     379             * std::priority_queue
     380             *
     381             * but it orders high-to-low (must reverse order..)
     382             *
     383             * ... ah cool, da ist direkt eine anleitung fÃŒr reverse order:
     384             *   http://www.cplusplus.com/reference/queue/priority_queue/priority_queue/
     385             */
     386        }
    354387    }
    355388   
     
    414447
    415448        // wait until the thread has exited
    416         logging_debug("joining system queue thread");
    417         queueThread->join();
     449    logging_debug("joining system queue thread");
     450    queueThread->join();
    418451
    419452        // delete pending events
  • source/ariba/utility/system/SystemQueue.h

    r12762 r12763  
    4040#define SYSTEMQUEUE_H_
    4141
    42 // #include <vector>
    43 #include <list>
    44 #include <cassert>
    4542#include "SystemEvent.h"
    4643#include "SystemEventListener.h"
    4744#include "ariba/utility/logging/Logging.h"
     45
     46#include <cassert>
     47#include <list>
     48#include <vector>
     49#include <queue>          // std::priority_queue
     50#include <functional>     // std::greater
     51
    4852#include <boost/date_time.hpp>
    4953#include <boost/cstdint.hpp>
     
    169173
    170174typedef list<SystemEvent> EventQueue;
     175typedef std::priority_queue<SystemEvent,
     176                            std::vector<SystemEvent>,
     177                            std::greater<SystemEvent> > PriorityEventQueue;
     178// typedef std::priority_queue<SystemEvent> PriorityEventQueue;
     179// TODO is vector the best underlay?
     180                           
    171181
    172182        //********************************************************
     
    205215        private:
    206216        EventQueue immediateEventsQ;
    207         EventQueue timedEventsQ;
    208        
    209         ptime now;
    210         ptime next_deadline;
    211 
     217        PriorityEventQueue timedEventsQ;
     218       
    212219        boost::condition_variable system_queue_idle;
    213220        boost::mutex queue_mutex;
  • tests/SystemQueue-tests.cc

    r12762 r12763  
    9292    void Check()
    9393    {
    94         // XXX
    9594        cout << "### Check ### "<< endl;
    9695
    9796        checkmark = true;
    9897    }
    99    
     98
     99    void Cancel()
     100    {
     101        cout << "### Cancel ### "<< endl;
     102
     103        SystemQueue::instance().cancel();
     104        checkmark = true;
     105    }
     106
    100107    void LongRunner()
    101108    {
     
    163170
    164171/**
     172 *  Enqueues an event but then cancels the SystemQueue without running
     173 */
     174TEST_F(SystemQueueTest, EmptyAfterCancel)
     175{
     176    SystemQueue& sysq = SystemQueue::instance();
     177
     178    EXPECT_TRUE( sysq.isEmpty() );
     179   
     180    // enqueue event
     181    sysq.scheduleCall(
     182        boost::bind(&SystemQueueTest::Check, this)
     183    );
     184   
     185    EXPECT_FALSE( sysq.isEmpty() );
     186   
     187    // cancel
     188    sysq.cancel();
     189   
     190    EXPECT_TRUE( sysq.isEmpty() );
     191}
     192
     193
     194/**
     195 *  cancels the SystemQueue from inside a scheduled event
     196 */
     197TEST_F(SystemQueueTest, CancelFromInsideEvent)
     198{
     199    SystemQueue& sysq = SystemQueue::instance();
     200    checkmark = false;  // just to be sure..
     201   
     202    // start
     203    sysq.run();
     204   
     205    // scheduleCall
     206    sysq.scheduleCall(
     207        boost::bind(&SystemQueueTest::Cancel, this)
     208    );
     209
     210    // wait for the event to happen
     211    wait_for_checkmark(MAX_WAIT);
     212
     213    EXPECT_FALSE( sysq.isRunning() ) << "SystemQueue has not stopped properly.";
     214    EXPECT_TRUE( sysq.isEmpty() ) << "SystemQueue has not stopped properly.";
     215}
     216
     217
     218/**
    165219 *  schedule a call and test whether it is actually performed by the SystemQueue
    166220 */
Note: See TracChangeset for help on using the changeset viewer.