// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. #include "SystemQueue.h" #include #include namespace ariba { namespace utility { typedef boost::mutex::scoped_lock scoped_lock; using boost::posix_time::microsec_clock; using boost::posix_time::time_duration; using boost::date_time::not_a_date_time; using boost::scoped_ptr; use_logging_cpp(SystemQueue); SystemQueue::SystemQueue() : SysQ( new QueueThread() ) { logging_debug("Creating SystemQueue at: " << this); } SystemQueue::~SystemQueue() { } SystemQueue& SystemQueue::instance() { static SystemQueue _inst; return _inst; } void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) { // copy SystemEvent ev(event); SysQ->insert(ev, delay); } // maps to function call internally to the Event-system void SystemQueue::scheduleCall( const boost::function0& function, uint32_t delay) { // copy function object boost::function0* function_ptr = new boost::function0(); (*function_ptr) = function; // schedule special call-event scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay ); } void SystemQueue::run() { assert ( ! SysQ->running ); SysQ->running = true; // start thread sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) ); } void SystemQueue::cancel() { // signal SysQ to quit (and abort queued events) SysQ->cancel(); // wait till actually completes // (should be fast, but the current event is allowed to finish) if ( sysq_thread ) { logging_debug("/// ... joining SysQ thread"); sysq_thread->join(); } // delete thread object sysq_thread.reset(); assert ( ! SysQ->isRunning() ); // clean up and respawn logging_debug("/// respawning SysQ"); SysQ.reset( new QueueThread() ); } void SystemQueue::dropAll( const SystemEventListener* mlistener) { // XXX // directScheduler.dropAll(mlistener); // delayScheduler.dropAll(mlistener); } bool SystemQueue::isEmpty() { return SysQ->isEmpty(); } bool SystemQueue::isRunning() { return SysQ->isRunning(); } // XXX // void SystemQueue::enterMethod(){ // // TODO: omnet case and delay scheduler // directScheduler.enter(); // } // // void SystemQueue::leaveMethod(){ // // TODO: omnet case and delay scheduler // directScheduler.leave(); // } SystemQueue::QueueThread::QueueThread() : // now( not_a_date_time ), // next_deadline( not_a_date_time ), processing_event( false ), running( false ), aborted( false ) { } SystemQueue::QueueThread::~QueueThread(){ } void SystemQueue::QueueThread::operator()() { logging_debug( "/// SysQ thread is alive." ); assert( running ); // this is set before the thread starts // main loop while ( ! aborted ) { // run next immediate event (only one) run_immediate_event(); // maintain timed events (move to immediateEventsQ, when deadline expired) check_timed_queue(); // wait for next deadline (if no immediate events pending) wait_for_next_deadline(); } logging_debug( "/// SysQ thread is quitting." ); running = false; } /// main loop functions void SystemQueue::QueueThread::run_immediate_event() { // get next event and process it if ( ! immediateEventsQ.empty() ) { scoped_ptr currently_processed_event; /* dequeue event */ // SYNCHRONIZED { scoped_lock lock( queue_mutex ); this->processing_event = true; // * dequeue first event * currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy immediateEventsQ.pop_front(); } /* dispatch event */ logging_debug("/// SysQ: dispatching event"); // measure execution time (1/2) ptime start_time = get_clock(); // * dispatch event * currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event ); // measure execution time (2/2) time_duration execution_time = get_clock() - start_time; // DEBUG OUTPUT: warning when execution takes too much time // [ TODO how long is "too much time"? ] if ( execution_time.total_milliseconds() > 50 ) { logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " to complete."); } /* [ TODO ] * * - we could also measure how long an event has been waiting in the queue before it's dispatched * (in order to detect overload) * * - especially for timed events, the displacement could be calculated * (and, e.g., put in relation with the actual intended sleep time) */ } this->processing_event = false; } void SystemQueue::QueueThread::check_timed_queue() { // this whole function is SYNCHRONIZED scoped_lock lock( queue_mutex ); ptime now = get_clock(); bool not_expired_events_reached = false; // move all expired events into the immediateEventsQ while ( ! timedEventsQ.empty() && ! not_expired_events_reached ) { const SystemEvent& ev = timedEventsQ.top(); time_duration remaining_sleep_time = ev.deadline - now; // BRANCH: deadline reached if ( remaining_sleep_time.is_negative() ) { // move to immediateEventsQ immediateEventsQ.push_back(ev); timedEventsQ.pop(); } // BRANCH: deadline not reached else { // okay, that's all for now. not_expired_events_reached = true; } } // while } void SystemQueue::QueueThread::wait_for_next_deadline() { // SYNCHRONIZED boost::mutex::scoped_lock lock(queue_mutex); if ( immediateEventsQ.empty() ) { // don't sleep when the SystemQueue is not already canceled if ( aborted ) return; // BRANCH: no timed events: sleep "forever" (until new events are scheduled) if ( timedEventsQ.empty() ) { logging_debug("/// SysQ is going to sleep."); this->system_queue_idle.wait( lock ); } // BRANCH: sleep till next timed event else { logging_debug( "/// SysQ is going to sleep for " << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds() << "ms. Deadline: " << timedEventsQ.top().deadline << ", Clock: " << get_clock() ); this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline ); } } } /// uniform clock interface ptime SystemQueue::QueueThread::get_clock() { // return microsec_clock::universal_time(); return microsec_clock::local_time(); } /// external interface bool SystemQueue::QueueThread::isRunning() { return running; } void SystemQueue::QueueThread::cancel() { logging_debug("/// Cancelling system queue... "); // SYNCHRONIZED { scoped_lock lock(queue_mutex); aborted = true; } logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + " << timedEventsQ.size() << " timed event(s) left."); system_queue_idle.notify_all(); } void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay ) { event.scheduledTime = get_clock(); // SYNCHRONIZED { scoped_lock lock( queue_mutex ); // BRANCH: immediate event if ( delay == 0 ) { immediateEventsQ.push_back(event); } // BRANCH: timed event else { event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay); event.delayTime = delay; // XXX I think this is no longer needed.. // XXX debug logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)"); timedEventsQ.push(event); // TODO push sorted.. (use sorted queue..) // timedEventsQ.pu /* * std::priority_queue * * but it orders high-to-low (must reverse order..) * * ... ah cool, da ist direkt eine anleitung für reverse order: * http://www.cplusplus.com/reference/queue/priority_queue/priority_queue/ */ } } // wake SysQ thread system_queue_idle.notify_one(); // NOTE: there is only one thread // (so it doesn't matter whether to call notify_one, or notify_all) } bool SystemQueue::QueueThread::isEmpty() { // SYNCHRONIZED scoped_lock lock( queue_mutex ); return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event; } // FIXME void SystemQueue::enterMethod() { assert( false ); } void SystemQueue::leaveMethod() { assert( false ); } //*************************************************************** //// XXX old SystemQueue subclasses // (needed as reference during development of the replacement) // #ifndef UNDERLAY_OMNET #define NOOLDSYSQ #ifndef NOOLDSYSQ void SystemQueue::QueueThread::run(){ running = true; queueThread = new boost::thread( boost::bind(&QueueThread::threadFunc, this) ); } void SystemQueue::QueueThread::cancel(){ logging_debug("cancelling system queue"); // cause the thread to exit { // get the lock, when we got the lock the // queue thread must be in itemsAvailable.wait() boost::mutex::scoped_lock lock(queueMutex); // set the running indicator and signal to run on // this will run the thread and quit it running = false; itemsAvailable.notify_all(); } // wait until the thread has exited logging_debug("joining system queue thread"); queueThread->join(); // delete pending events logging_debug("deleting pending system queue events"); while( eventsQueue.size() > 0 ){ eventsQueue.erase( eventsQueue.begin() ); } // delete the thread, so that a subsuquent run() can be called delete queueThread; queueThread = NULL; } bool SystemQueue::QueueThread::isEmpty(){ boost::mutex::scoped_lock lock( queueMutex ); return eventsQueue.empty(); } void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){ // if this is called from a module that is currently handling // a thread (called from SystemQueue::onNextQueueItem), the // thread is the same anyway and the mutex will be already // aquired, otherwise we aquire it now boost::mutex::scoped_lock lock( queueMutex ); if ( delay > 0 ) { logging_debug("SystemQueue(" << this << ") : Schedule event in: " << delay << " ms; Events in queue (before insert): " << eventsQueue.size() ); } eventsQueue.push_back( event ); eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time(); eventsQueue.back().delayTime = delay; eventsQueue.back().remainingDelay = delay; if ( delay > 0 ) { logging_debug("SystemQueue(" << this << ") : Events in queue (after insert): " << eventsQueue.size() ); } onItemInserted( event ); itemsAvailable.notify_all(); } void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) { boost::mutex::scoped_lock lock( queueMutex ); bool deleted; do{ deleted = false; EventQueue::iterator i = eventsQueue.begin(); EventQueue::iterator iend = eventsQueue.end(); for( ; i != iend; i++){ if((*i).getListener() == mlistener){ eventsQueue.erase(i); deleted = true; break; } } }while(deleted); } void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) { boost::mutex::scoped_lock lock( obj->queueMutex ); while( obj->running ) { // wait until an item is in the queue or we are notified // to quit the thread. in case the thread is about to // quit, the queueThreadRunning variable will indicate // this and cause the thread to exit while ( obj->running && obj->eventsQueue.empty() ){ // const boost::system_time duration = // boost::get_system_time() + // boost::posix_time::milliseconds(100); // obj->itemsAvailable.timed_wait( lock, duration ); obj->itemsAvailable.wait( lock ); } // // work all the items that are currently in the queue // while( obj->running && (!obj->eventsQueue.empty()) ) { // fetch the first item in the queue // and deliver it to the queue handler SystemEvent ev = obj->eventsQueue.front(); // XXX debugging the delay-scheduler.. if ( ev.delayTime > 0 ) logging_debug("SystemQueue(" << obj << ") : Events in queue (before execution): " << obj->eventsQueue.size()); obj->eventsQueue.erase( obj->eventsQueue.begin() ); // call the queue and this will // call the actual event handler obj->queueMutex.unlock(); obj->onNextQueueItem( ev ); obj->queueMutex.lock(); // XXX debugging the delay-scheduler.. if ( ev.delayTime > 0 ) logging_debug("SystemQueue(" << obj << ") : Remaining events in queue (after execution): " << obj->eventsQueue.size()); } // !obj->eventsQueue.empty() ) } // while (obj->running) logging_debug("system queue exited"); } void SystemQueue::QueueThread::enter(){ queueMutex.lock(); } void SystemQueue::QueueThread::leave(){ queueMutex.unlock(); } //*************************************************************** SystemQueue::QueueThreadDirect::QueueThreadDirect(){ } SystemQueue::QueueThreadDirect::~QueueThreadDirect(){ } void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){ // do nothing here } void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){ // directly deliver the item to the event.getListener()->handleSystemEvent( event ); } //*************************************************************** SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue) : QueueThread( _transferQueue ), isSleeping( false ) { assert( _transferQueue != NULL ); } SystemQueue::QueueThreadDelay::~QueueThreadDelay(){ } void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){ if( !isSleeping) { logging_warn("SystemQueue(" << this << ") : No, I'm not asleep!! New item inserted."); return; // TODO Mario: shouldn't we sort anyway..? } // break an existing sleep and // remember the time that was actually slept for // and change it for every event in the queue assert( !eventsQueue.empty()); sleepCond.notify_all(); ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); boost::posix_time::time_duration duration = sleepEnd - sleepStart; uint32_t sleptTime = duration.total_milliseconds(); EventQueue::iterator i = eventsQueue.begin(); EventQueue::iterator iend = eventsQueue.end(); logging_debug("SystemQueue(" << this << ") : Adjusting remaining delays:"); // TODO Mario: What about the just inserted event..? for( ; i != iend; i++ ) { if( sleptTime >= i->remainingDelay) i->remainingDelay = 0; else { i->remainingDelay -= sleptTime; // XXX Mario: Testcode, just to find a bug... boost::posix_time::time_duration time_passed = sleepEnd - i->getScheduledTime(); logging_debug("SystemQueue(" << this << ") : Total: " << i->delayTime << ", remainingDelay: " << i->remainingDelay << ", time already passed: " << time_passed.total_milliseconds() ); } } // for( ; i != iend; i++ ) // now we have to reorder the events // in the queue with respect to their remaining delay // the SystemQueue::operator< takes care of the // ordering with respect to the remaining delay std::sort( eventsQueue.begin(), eventsQueue.end() ); } void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){ // sleeps will be cancelled in the // onItemInserted function when a new // event arrives during sleeping assert( !isSleeping ); // the given item is the one with the least // amount of sleep time left. because all // items are reordered in onItemInserted if( event.remainingDelay > 0 ) { const boost::system_time duration = boost::get_system_time() + boost::posix_time::milliseconds(event.remainingDelay); logging_debug("SystemQueue(" << this << ") : Sleeping for: " << event.remainingDelay << " ms"); { boost::unique_lock lock( sleepMutex ); sleepStart = boost::posix_time::microsec_clock::local_time(); isSleeping = true; sleepCond.timed_wait( lock, duration ); isSleeping = false; } } // if( event.remainingDelay > 0 ) // if the sleep succeeded and was not // interrupted by a new incoming item // we can now deliver this event ptime sleepEnd = boost::posix_time::microsec_clock::local_time(); boost::posix_time::time_duration duration = sleepEnd - sleepStart; uint32_t sleptTime = duration.total_milliseconds(); logging_debug("SystemQueue(" << this << ") : Slept for: " << sleptTime << " ms; until: " << sleepEnd); // TODO MARIO: find the bug that loses events... if (event.remainingDelay <= sleptTime) { logging_debug("SystemQueue(" << this << ") : Transferring scheduled event into the direct queue. Scheduled time: " << event.getScheduledTime() ); transferQueue->insert( event, 0 ); } else { logging_warn("SystemQueue(" << this << ") : Scheduled event lost!! :-( (Sleep should have been " << event.remainingDelay - sleptTime << " ms longer..)"); logging_debug("SystemQueue(" << this << ") : Total delay: " << event.delayTime << "; remaining delay: " << event.remainingDelay); // throw std::logic_error("Scheduled event lost!! :-("); } } #endif // #ifndef UNDERLAY_OMNET //*************************************************************** }} // spovnet, common