An Overlay-based
Virtual Network Substrate
SpoVNet

Changeset 12764 for source


Ignore:
Timestamp:
Mar 14, 2014, 7:58:45 PM (5 years ago)
Author:
hock@…
Message:

New SystemQueue?...

... is passing all unit tests, now. :-)

Location:
source/ariba/utility/system
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/system/SystemEvent.h

    r12763 r12764  
    7171        inline SystemEvent(SystemEventListener* mlistener, SystemEventType mtype =
    7272                        SystemEventType::DEFAULT, void* mdata = NULL) :
    73                 listener(mlistener), type(mtype), data(mdata), scheduledTime(
    74                                 boost::posix_time::not_a_date_time), delayTime(0),
    75                                 remainingDelay(0)
    76 
     73                listener(mlistener),
     74                type(mtype),
     75                data(mdata),
     76                scheduledTime(boost::posix_time::not_a_date_time),
     77                deadline(boost::posix_time::not_a_date_time),
     78                delayTime(0),
     79                remainingDelay(0)
    7780        {
    7881        }
     
    8184        inline SystemEvent(SystemEventListener* mlistener, SystemEventType mtype =
    8285                        SystemEventType::DEFAULT, T* mdata = NULL) :
    83                 listener(mlistener), type(mtype), data((void*) mdata), scheduledTime(
    84                                 boost::posix_time::not_a_date_time), delayTime(0),
    85                                 remainingDelay(0) {
     86                listener(mlistener),
     87                type(mtype),
     88                data((void*) mdata),
     89                scheduledTime(boost::posix_time::not_a_date_time),
     90                deadline(boost::posix_time::not_a_date_time),
     91                delayTime(0),
     92                remainingDelay(0) {
    8693        }
    8794
    88         // FIXME AKTUELL MARIO: copy deadline (etc?) !!!
    8995        inline SystemEvent(const SystemEvent& copy) {
    9096                this->scheduledTime = copy.scheduledTime;
     97        this->deadline = copy.deadline;
    9198                this->delayTime = copy.delayTime;
    9299                this->remainingDelay = copy.remainingDelay;
     
    98105        inline void operator=(const SystemEvent& right) {
    99106                this->scheduledTime = right.scheduledTime;
     107        this->deadline = right.deadline;
    100108                this->delayTime = right.delayTime;
    101109                this->remainingDelay = right.remainingDelay;
  • source/ariba/utility/system/SystemQueue.cpp

    r12763 r12764  
    4040#include <stdexcept>
    4141
     42// TODO Mario:
     43// check if there is any debug out left to remove
     44
    4245namespace ariba {
    4346namespace utility {
    4447   
    45     typedef boost::mutex::scoped_lock scoped_lock;
    46    
    47     using boost::posix_time::microsec_clock;
    48     using boost::posix_time::time_duration;
    49     using boost::date_time::not_a_date_time;
    50     using boost::scoped_ptr;
     48typedef boost::mutex::scoped_lock scoped_lock;
     49
     50using boost::posix_time::microsec_clock;
     51using boost::posix_time::time_duration;
     52using boost::date_time::not_a_date_time;
     53using boost::scoped_ptr;
    5154   
    5255
     
    7376void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay )
    7477{
     78    assert ( SysQ->running );  // should we really enforce this?
     79   
    7580    // copy
    7681    SystemEvent ev(event);
     
    96101{
    97102    assert ( ! SysQ->running );
     103    assert ( ! SysQ->unclean );
    98104   
    99105    SysQ->running = true;
     
    105111void SystemQueue::cancel()
    106112{
     113    // CHECK: this function must not be called from within a SystemQueue-Event
     114    if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() )
     115    {
     116        logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
     117        throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!");
     118    }
     119
     120   
    107121    // signal SysQ to quit (and abort queued events)
    108122    SysQ->cancel();
     
    129143void SystemQueue::dropAll( const SystemEventListener* mlistener)
    130144{
    131 //  XXX
     145//  TODO
    132146//      directScheduler.dropAll(mlistener);
    133147//      delayScheduler.dropAll(mlistener);
     
    144158}
    145159
    146 // XXX
    147 // void SystemQueue::enterMethod(){
    148 //      // TODO: omnet case and delay scheduler
    149 //      directScheduler.enter();
    150 // }
    151 //
    152 // void SystemQueue::leaveMethod(){
    153 //      // TODO: omnet case and delay scheduler
    154 //      directScheduler.leave();
    155 // }
    156 
    157 
     160
     161//********************************************************
     162
     163
     164/// constructor
    158165SystemQueue::QueueThread::QueueThread() :
    159 //     now( not_a_date_time ),
    160 //     next_deadline( not_a_date_time ),
    161166    processing_event( false ),
    162167    running( false ),
    163     aborted( false )
     168    aborted( false ),
     169    unclean( false )
    164170{
    165171}
     
    190196    logging_debug( "/// SysQ thread is quitting." );
    191197   
     198    unclean = true;
    192199    running = false;
    193200}
     
    219226       
    220227        /* dispatch event */
    221         logging_debug("/// SysQ: dispatching event");
     228//         logging_debug("/// SysQ: dispatching event");
    222229       
    223230        // measure execution time (1/2)
     
    319326ptime SystemQueue::QueueThread::get_clock()
    320327{
    321 //     return microsec_clock::universal_time();
    322     return microsec_clock::local_time();
     328    return microsec_clock::universal_time();
    323329}
    324330
     
    367373        {
    368374            event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay);
    369             event.delayTime = delay;  // XXX I think this is no longer needed..
     375            event.delayTime = delay;  // ( I think this is no longer needed.. )
    370376           
    371             // XXX debug
    372             logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
     377//             // debug output
     378//             logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)");
    373379           
    374380            timedEventsQ.push(event);
    375            
    376             // TODO push sorted.. (use sorted queue..)
    377 //             timedEventsQ.pu
    378             /*
    379              * std::priority_queue
    380              *
    381              * but it orders high-to-low (must reverse order..)
    382              *
    383              * ... ah cool, da ist direkt eine anleitung fÃŒr reverse order:
    384              *   http://www.cplusplus.com/reference/queue/priority_queue/priority_queue/
    385              */
    386381        }
    387382    }
     
    415410
    416411
    417 //***************************************************************
    418 //// XXX  old SystemQueue subclasses
    419 //        (needed as reference during development of the replacement)
    420 
    421 // #ifndef UNDERLAY_OMNET
    422 #define NOOLDSYSQ
    423 #ifndef NOOLDSYSQ
    424 
    425 void SystemQueue::QueueThread::run(){
    426         running = true;
    427 
    428         queueThread = new boost::thread(
    429                 boost::bind(&QueueThread::threadFunc, this) );
    430 }
    431 
    432 void SystemQueue::QueueThread::cancel(){
    433 
    434         logging_debug("cancelling system queue");
    435 
    436         // cause the thread to exit
    437         {
    438                 // get the lock, when we got the lock the
    439                 // queue thread must be in itemsAvailable.wait()
    440                 boost::mutex::scoped_lock lock(queueMutex);
    441 
    442                 // set the running indicator and signal to run on
    443                 // this will run the thread and quit it
    444                 running = false;
    445                 itemsAvailable.notify_all();
    446         }
    447 
    448         // wait until the thread has exited
    449     logging_debug("joining system queue thread");
    450     queueThread->join();
    451 
    452         // delete pending events
    453         logging_debug("deleting pending system queue events");
    454         while( eventsQueue.size() > 0 ){
    455                 eventsQueue.erase( eventsQueue.begin() );
    456         }
    457 
    458         // delete the thread, so that a subsuquent run() can be called
    459         delete queueThread;
    460         queueThread = NULL;
    461 }
    462 
    463 bool SystemQueue::QueueThread::isEmpty(){
    464         boost::mutex::scoped_lock lock( queueMutex );
    465         return eventsQueue.empty();
    466 }
    467 
    468 void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
    469 
    470         // if this is called from a module that is currently handling
    471         // a thread (called from SystemQueue::onNextQueueItem), the
    472         // thread is the same anyway and the mutex will be already
    473         // aquired, otherwise we aquire it now
    474 
    475         boost::mutex::scoped_lock lock( queueMutex );
    476 
    477         if ( delay > 0 )
    478         {
    479                 logging_debug("SystemQueue(" << this << ") : Schedule event in: " << delay << " ms; Events in queue (before insert): " << eventsQueue.size() );
    480         }
    481 
    482         eventsQueue.push_back( event );
    483         eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
    484         eventsQueue.back().delayTime = delay;
    485         eventsQueue.back().remainingDelay = delay;
    486        
    487         if ( delay > 0 )
    488         {
    489                 logging_debug("SystemQueue(" << this << ") : Events in queue (after insert): " << eventsQueue.size() );
    490         }
    491 
    492         onItemInserted( event );
    493         itemsAvailable.notify_all();
    494 }
    495 
    496 void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) {
    497         boost::mutex::scoped_lock lock( queueMutex );
    498 
    499         bool deleted;
    500         do{
    501                 deleted = false;
    502                 EventQueue::iterator i = eventsQueue.begin();
    503                 EventQueue::iterator iend = eventsQueue.end();
    504 
    505                 for( ; i != iend; i++){
    506                         if((*i).getListener() == mlistener){
    507                                 eventsQueue.erase(i);
    508                                 deleted = true;
    509                                 break;
    510                         }
    511                 }
    512         }while(deleted);
    513 }
    514 
    515 void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
    516 
    517         boost::mutex::scoped_lock lock( obj->queueMutex );
    518 
    519         while( obj->running ) {
    520 
    521                 // wait until an item is in the queue or we are notified
    522                 // to quit the thread. in case the thread is about to
    523                 // quit, the queueThreadRunning variable will indicate
    524                 // this and cause the thread to exit
    525 
    526                 while ( obj->running && obj->eventsQueue.empty() ){
    527 
    528 //                      const boost::system_time duration =
    529 //                                      boost::get_system_time() +
    530 //                                      boost::posix_time::milliseconds(100);
    531 //                      obj->itemsAvailable.timed_wait( lock, duration );
    532 
    533                         obj->itemsAvailable.wait( lock );
    534                 }
    535 
    536                 //
    537                 // work all the items that are currently in the queue
    538                 //
    539 
    540                 while( obj->running && (!obj->eventsQueue.empty()) ) {
    541 
    542                         // fetch the first item in the queue
    543                         // and deliver it to the queue handler
    544                         SystemEvent ev = obj->eventsQueue.front();
    545 
    546                         // XXX debugging the delay-scheduler..
    547                         if ( ev.delayTime > 0 )
    548                                 logging_debug("SystemQueue(" << obj << ") : Events in queue (before execution): " << obj->eventsQueue.size());
    549 
    550                         obj->eventsQueue.erase( obj->eventsQueue.begin() );
    551 
    552                         // call the queue and this will
    553                         // call the actual event handler
    554                         obj->queueMutex.unlock();
    555                         obj->onNextQueueItem( ev );
    556                         obj->queueMutex.lock();
    557 
    558                         // XXX debugging the delay-scheduler..
    559                         if ( ev.delayTime > 0 )
    560                                 logging_debug("SystemQueue(" << obj << ") : Remaining events in queue (after execution): " << obj->eventsQueue.size());
    561 
    562                 } // !obj->eventsQueue.empty() )
    563         } // while (obj->running)
    564 
    565         logging_debug("system queue exited");
    566 }
    567 
    568 void SystemQueue::QueueThread::enter(){
    569         queueMutex.lock();
    570 }
    571 
    572 void SystemQueue::QueueThread::leave(){
    573         queueMutex.unlock();
    574 }
    575 
    576 
    577 //***************************************************************
    578 
    579 SystemQueue::QueueThreadDirect::QueueThreadDirect(){
    580 }
    581 
    582 SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
    583 }
    584 
    585 void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
    586         // do nothing here
    587 }
    588 
    589 void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
    590         // directly deliver the item to the
    591         event.getListener()->handleSystemEvent( event );
    592 }
    593 
    594 //***************************************************************
    595 
    596 SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
    597         : QueueThread( _transferQueue ), isSleeping( false ) {
    598 
    599         assert( _transferQueue != NULL );
    600 }
    601 
    602 SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
    603 }
    604 
    605 void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
    606 
    607         if( !isSleeping)
    608         {
    609                 logging_warn("SystemQueue(" << this << ") : No, I'm not asleep!! New item inserted.");
    610                 return;  // TODO Mario: shouldn't we sort anyway..?
    611         }
    612 
    613         // break an existing sleep and
    614         // remember the time that was actually slept for
    615         // and change it for every event in the queue
    616 
    617         assert( !eventsQueue.empty());
    618         sleepCond.notify_all();
    619 
    620         ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
    621         boost::posix_time::time_duration duration = sleepEnd - sleepStart;
    622         uint32_t sleptTime = duration.total_milliseconds();
    623 
    624         EventQueue::iterator i = eventsQueue.begin();
    625         EventQueue::iterator iend = eventsQueue.end();
    626 
    627         logging_debug("SystemQueue(" << this << ") : Adjusting remaining delays:");
    628        
    629         // TODO Mario: What about the just inserted event..?
    630         for( ; i != iend; i++ ) {
    631 
    632                 if( sleptTime >= i->remainingDelay)
    633                         i->remainingDelay = 0;
    634                 else
    635                 {
    636                         i->remainingDelay -= sleptTime;
    637                        
    638                         // XXX Mario: Testcode, just to find a bug...
    639                         boost::posix_time::time_duration time_passed = sleepEnd - i->getScheduledTime();
    640                         logging_debug("SystemQueue(" << this << ") : Total: " <<
    641                                         i->delayTime << ", remainingDelay: " << i->remainingDelay <<
    642                                         ", time already passed: " << time_passed.total_milliseconds() );
    643                 }
    644 
    645         } // for( ; i != iend; i++ )
    646 
    647         // now we have to reorder the events
    648         // in the queue with respect to their remaining delay
    649         // the SystemQueue::operator< takes care of the
    650         // ordering with respect to the remaining delay
    651 
    652         std::sort( eventsQueue.begin(), eventsQueue.end() );
    653 
    654 }
    655 
    656 void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
    657 
    658         // sleeps will be cancelled in the
    659         // onItemInserted function when a new
    660         // event arrives during sleeping
    661 
    662         assert( !isSleeping );
    663 
    664         // the given item is the one with the least
    665         // amount of sleep time left. because all
    666         // items are reordered in onItemInserted
    667 
    668         if( event.remainingDelay > 0 ) {
    669 
    670                 const boost::system_time duration =
    671                         boost::get_system_time() +
    672                         boost::posix_time::milliseconds(event.remainingDelay);
    673 
    674                 logging_debug("SystemQueue(" << this << ") : Sleeping for: " << event.remainingDelay << " ms");
    675                
    676                 {
    677                         boost::unique_lock<boost::mutex> lock( sleepMutex );
    678 
    679                         sleepStart = boost::posix_time::microsec_clock::local_time();
    680                         isSleeping = true;
    681                        
    682                         sleepCond.timed_wait( lock, duration );
    683 
    684                         isSleeping = false;
    685                 }
    686 
    687         } // if( event.remainingDelay > 0 )
    688 
    689         // if the sleep succeeded and was not
    690         // interrupted by a new incoming item
    691         // we can now deliver this event
    692 
    693         ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
    694         boost::posix_time::time_duration duration = sleepEnd - sleepStart;
    695         uint32_t sleptTime = duration.total_milliseconds();
    696 
    697         logging_debug("SystemQueue(" << this << ") : Slept for: " << sleptTime << " ms; until: " << sleepEnd);
    698        
    699         // TODO MARIO: find the bug that loses events...
    700         if (event.remainingDelay <= sleptTime)
    701         {
    702                 logging_debug("SystemQueue(" << this << ") : Transferring scheduled event into the direct queue. Scheduled time: " << event.getScheduledTime() );
    703                 transferQueue->insert( event, 0 );
    704         }
    705         else
    706         {
    707                 logging_warn("SystemQueue(" << this << ") : Scheduled event lost!! :-(   (Sleep should have been " << event.remainingDelay - sleptTime << " ms longer..)");
    708                 logging_debug("SystemQueue(" << this << ") : Total delay: " << event.delayTime << "; remaining delay: " << event.remainingDelay);
    709                
    710 //              throw std::logic_error("Scheduled event lost!! :-(");
    711         }
    712 }
    713 
    714 #endif // #ifndef UNDERLAY_OMNET
    715 
    716 //***************************************************************
     412// XXX code from old system queue
     413// void SystemQueue::QueueThread::enter(){
     414//      queueMutex.lock();
     415// }
     416//
     417// void SystemQueue::QueueThread::leave(){
     418//      queueMutex.unlock();
     419// }
    717420
    718421}} // spovnet, common
  • source/ariba/utility/system/SystemQueue.h

    r12763 r12764  
    174174typedef list<SystemEvent> EventQueue;
    175175typedef std::priority_queue<SystemEvent,
    176                             std::vector<SystemEvent>,
     176                            std::vector<SystemEvent>,  // [ TODO is vector the best underlay? ]
    177177                            std::greater<SystemEvent> > PriorityEventQueue;
    178 // typedef std::priority_queue<SystemEvent> PriorityEventQueue;
    179 // TODO is vector the best underlay?
    180                            
    181178
    182179        //********************************************************
     
    224221                volatile bool running;
    225222        volatile bool aborted;
     223        volatile bool unclean;
    226224        }; // class QueueThread
    227225
Note: See TracChangeset for help on using the changeset viewer.