Ignore:
Timestamp:
Mar 13, 2014, 7:30:17 PM (11 years ago)
Author:
hock@…
Message:

priority queue (but not tested)

--> FIXME in SystemEvent.h (has to be fixed first!)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/system/SystemQueue.cpp

    r12762 r12763  
    110110    // wait till actually completes
    111111    //   (should be fast, but the current event is allowed to finish)
    112     logging_debug("/// ... joining SysQ thread");
    113     sysq_thread->join();
     112    if ( sysq_thread )
     113    {
     114        logging_debug("/// ... joining SysQ thread");
     115        sysq_thread->join();
     116    }
    114117   
    115118    // delete thread object
     
    154157
    155158SystemQueue::QueueThread::QueueThread() :
    156     now( not_a_date_time ),
    157     next_deadline( not_a_date_time ),
     159//     now( not_a_date_time ),
     160//     next_deadline( not_a_date_time ),
    158161    processing_event( false ),
    159162    running( false ),
     
    198201void SystemQueue::QueueThread::run_immediate_event()
    199202{
    200     // measure execution time
    201     ptime start_time = get_clock();
    202    
    203203    // get next event and process it
    204204    if ( ! immediateEventsQ.empty() )
     
    206206        scoped_ptr<SystemEvent> currently_processed_event;
    207207       
     208        /* dequeue event */
    208209        // SYNCHRONIZED
    209210        {
     
    212213            this->processing_event = true;
    213214           
    214             // dequeue first event
     215            // * dequeue first event *
    215216            currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) );   // copy
    216217            immediateEventsQ.pop_front();
    217218        }
    218219       
     220        /* dispatch event */
    219221        logging_debug("/// SysQ: dispatching event");
    220222       
     223        // measure execution time (1/2)
     224        ptime start_time = get_clock();
     225
     226        // * dispatch event *
    221227        currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event );
     228       
     229        // measure execution time (2/2)
     230        time_duration execution_time = get_clock() - start_time;
     231       
     232        // DEBUG OUTPUT: warning when execution takes too much time
     233        //   [ TODO how long is "too much time"? ]
     234        if ( execution_time.total_milliseconds() > 50 )
     235        {
     236            logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
     237        }
     238
     239        /*  [ TODO ]
     240        *
     241        *  - we could also measure how long an event has been waiting in the queue before it's dispatched
     242        *  (in order to detect overload)
     243        *
     244        *  - especially for timed events, the displacement could be calculated
     245        *  (and, e.g., put in relation with the actual intended sleep time)
     246        */
    222247    }
    223248   
    224     // measure execution time
    225     now = get_clock();   // NOTE: this is reused in check_timed_queue();
    226     time_duration execution_time = now - start_time;
    227    
    228249    this->processing_event = false;
    229    
    230     // DEBUG OUTPUT: warning when execution takes too much time
    231     //   [ TODO how long is "too much time"? ]
    232     if ( execution_time.total_milliseconds() > 50 )
    233     {
    234         logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete.");
    235     }
    236250}
    237251
     
    241255    scoped_lock lock( queue_mutex );
    242256
    243    
    244     // NOTE: this->now was just set by this->run_immediate_event()
    245 
    246     // reset next deadline
    247     this->next_deadline = boost::date_time::not_a_date_time;
    248 
     257    ptime now = get_clock();
    249258    bool not_expired_events_reached = false;
    250259   
     
    252261    while ( ! timedEventsQ.empty() && ! not_expired_events_reached )
    253262    {
    254         SystemEvent& ev = timedEventsQ.front();
    255        
    256         time_duration remaining_sleep_time = ev.deadline - this->now;
     263        const SystemEvent& ev = timedEventsQ.top();
     264       
     265        time_duration remaining_sleep_time = ev.deadline - now;
    257266       
    258267        // BRANCH: deadline reached
     
    261270            // move to immediateEventsQ
    262271            immediateEventsQ.push_back(ev);
    263             timedEventsQ.pop_front();
     272            timedEventsQ.pop();
    264273        }
    265274        // BRANCH: deadline not reached
    266275        else
    267276        {
    268             // store time for next sleep
    269             this->next_deadline = ev.deadline;
    270            
    271277            // okay, that's all for now.
    272278            not_expired_events_reached = true;
     
    277283void SystemQueue::QueueThread::wait_for_next_deadline()
    278284{
     285    // SYNCHRONIZED
     286    boost::mutex::scoped_lock lock(queue_mutex);
     287
    279288    if ( immediateEventsQ.empty() )
    280289    {
    281         boost::mutex::scoped_lock lock(queue_mutex);
    282        
    283290         // don't sleep when the SystemQueue is not already canceled
    284291        if ( aborted )
     
    287294       
    288295        // BRANCH: no timed events: sleep "forever" (until new events are scheduled)
    289         if ( this->next_deadline.is_not_a_date_time() )
     296        if ( timedEventsQ.empty() )
    290297        {
    291298            logging_debug("/// SysQ is going to sleep.");
     
    296303        else
    297304        {
    298             logging_debug("/// SysQ is going to sleep for "
    299                         << ( next_deadline - get_clock() ).total_milliseconds()
    300                         << "ms.");
    301 
    302             this->system_queue_idle.timed_wait( lock, next_deadline );
     305            logging_debug( "/// SysQ is going to sleep for "
     306                        << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds()
     307                        << "ms. Deadline: "
     308                        << timedEventsQ.top().deadline
     309                        << ", Clock: "
     310                        << get_clock() );
     311
     312            this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline );
    303313        }
    304314    }
     
    309319ptime SystemQueue::QueueThread::get_clock()
    310320{
    311     return microsec_clock::universal_time();
     321//     return microsec_clock::universal_time();
     322    return microsec_clock::local_time();
    312323}
    313324
     
    352363            immediateEventsQ.push_back(event);
    353364        }
     365        // BRANCH: timed event
     366        else
     367        {
     368            event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
     369            event.delayTime = delay;  // XXX I think this is no longer needed..
     370           
     371            // XXX debug
     372            logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
     373           
     374            timedEventsQ.push(event);
     375           
     376            // TODO push sorted.. (use sorted queue..)
     377//             timedEventsQ.pu
     378            /*
     379             * std::priority_queue
     380             *
     381             * but it orders high-to-low (must reverse order..)
     382             *
     383             * ... ah cool, da ist direkt eine anleitung fÃŒr reverse order:
     384             *   http://www.cplusplus.com/reference/queue/priority_queue/priority_queue/
     385             */
     386        }
    354387    }
    355388   
     
    414447
    415448        // wait until the thread has exited
    416         logging_debug("joining system queue thread");
    417         queueThread->join();
     449    logging_debug("joining system queue thread");
     450    queueThread->join();
    418451
    419452        // delete pending events
Note: See TracChangeset for help on using the changeset viewer.