// [License] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. #include "SystemQueue.h" #include #include // TODO Mario: // check if there is any debug out left to remove namespace ariba { namespace utility { typedef boost::mutex::scoped_lock scoped_lock; using boost::posix_time::microsec_clock; using boost::posix_time::time_duration; using boost::date_time::not_a_date_time; using boost::scoped_ptr; use_logging_cpp(SystemQueue); SystemQueue::SystemQueue() : SysQ( new QueueThread() ) { logging_debug("Creating SystemQueue at: " << this); } SystemQueue::~SystemQueue() { } SystemQueue& SystemQueue::instance() { static SystemQueue _inst; return _inst; } void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) { // assert ( SysQ->running ); // should we really enforce this? if ( ! SysQ->running ) { logging_debug("/// WARNING: The SystemQueue is NOT RUNNING!"); } // copy SystemEvent ev(event); SysQ->insert(ev, delay); } // maps to function call internally to the Event-system void SystemQueue::scheduleCall( const boost::function0& function, uint32_t delay) { // copy function object boost::function0* function_ptr = new boost::function0(); (*function_ptr) = function; // schedule special call-event scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay ); } void SystemQueue::run() { // TODO should these be assertion or exceptions..? (exceptions => unit test) assert ( ! SysQ->running ); assert ( ! SysQ->unclean ); SysQ->running = true; // start thread sysq_thread.reset( new boost::thread(boost::ref(*SysQ)) ); } void SystemQueue::cancel() { // CHECK: this function must not be called from within a SystemQueue-Event if ( sysq_thread && boost::this_thread::get_id() == sysq_thread->get_id() ) { logging_warn("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!"); throw std::logic_error("SystemQueue::cancel() was called from within a SystemQueue-Event. This is not allowed!"); } // signal SysQ to quit (and abort queued events) SysQ->cancel(); // wait till actually completes // (should be fast, but the current event is allowed to finish) if ( sysq_thread ) { logging_debug("/// ... joining SysQ thread"); sysq_thread->join(); } // delete thread object sysq_thread.reset(); assert ( ! SysQ->isRunning() ); // clean up and respawn logging_debug("/// respawning SysQ"); SysQ.reset( new QueueThread() ); } void SystemQueue::leave() { // signal SysQ to quit (and abort queued events) SysQ->cancel(); } void SystemQueue::join() { if ( sysq_thread ) { logging_debug("/// ... joining SysQ thread"); sysq_thread->join(); } } void SystemQueue::dropAll( const SystemEventListener* mlistener) { // TODO // directScheduler.dropAll(mlistener); // delayScheduler.dropAll(mlistener); } bool SystemQueue::isEmpty() { return SysQ->isEmpty(); } bool SystemQueue::isRunning() { return SysQ->isRunning(); } //******************************************************** /// constructor SystemQueue::QueueThread::QueueThread() : processing_event( false ), running( false ), aborted( false ), unclean( false ) { } SystemQueue::QueueThread::~QueueThread(){ } void SystemQueue::QueueThread::operator()() { logging_debug( "/// SysQ thread is alive." ); assert( running ); // this is set before the thread starts // main loop while ( ! aborted ) { // run next immediate event (only one) run_immediate_event(); // maintain timed events (move to immediateEventsQ, when deadline expired) check_timed_queue(); // wait for next deadline (if no immediate events pending) wait_for_next_deadline(); } logging_debug( "/// SysQ thread is quitting." ); unclean = true; running = false; } /// main loop functions void SystemQueue::QueueThread::run_immediate_event() { // get next event and process it if ( ! immediateEventsQ.empty() ) { scoped_ptr currently_processed_event; /* dequeue event */ // SYNCHRONIZED { scoped_lock lock( queue_mutex ); this->processing_event = true; // * dequeue first event * currently_processed_event.reset( new SystemEvent(immediateEventsQ.front()) ); // copy immediateEventsQ.pop_front(); } /* dispatch event */ // logging_debug("/// SysQ: dispatching event"); // measure execution time (1/2) ptime start_time = get_clock(); // * dispatch event * currently_processed_event->getListener()->handleSystemEvent( *currently_processed_event ); // measure execution time (2/2) time_duration execution_time = get_clock() - start_time; // DEBUG OUTPUT: warning when execution takes too much time // [ TODOx how long is "too much time"? ] if ( execution_time.total_milliseconds() > 50 ) { logging_info("WARNING: Last event took " << execution_time.total_milliseconds() << " ms to complete."); } /* [ TODOx ] * * - we could also measure how long an event has been waiting in the queue before it's dispatched * (in order to detect overload) * * - especially for timed events, the displacement could be calculated * (and, e.g., put in relation with the actual intended sleep time) */ } this->processing_event = false; } void SystemQueue::QueueThread::check_timed_queue() { // this whole function is SYNCHRONIZED scoped_lock lock( queue_mutex ); ptime now = get_clock(); bool not_expired_events_reached = false; // move all expired events into the immediateEventsQ while ( ! timedEventsQ.empty() && ! not_expired_events_reached ) { const SystemEvent& ev = timedEventsQ.top(); time_duration remaining_sleep_time = ev.deadline - now; // BRANCH: deadline reached if ( remaining_sleep_time.is_negative() ) { // move to immediateEventsQ immediateEventsQ.push_back(ev); timedEventsQ.pop(); } // BRANCH: deadline not reached else { // okay, that's all for now. not_expired_events_reached = true; } } // while } void SystemQueue::QueueThread::wait_for_next_deadline() { // SYNCHRONIZED boost::mutex::scoped_lock lock(queue_mutex); if ( immediateEventsQ.empty() ) { // don't sleep when the SystemQueue is not already canceled if ( aborted ) return; // BRANCH: no timed events: sleep "forever" (until new events are scheduled) if ( timedEventsQ.empty() ) { // logging_debug("/// SysQ is going to sleep."); this->system_queue_idle.wait( lock ); } // BRANCH: sleep till next timed event else { // logging_debug( "/// SysQ is going to sleep for " // << ( timedEventsQ.top().deadline - get_clock() ).total_milliseconds() // << "ms. Deadline: " // << timedEventsQ.top().deadline // << ", Clock: " // << get_clock() ); this->system_queue_idle.timed_wait( lock, timedEventsQ.top().deadline ); } } } /// uniform clock interface ptime SystemQueue::QueueThread::get_clock() { return microsec_clock::universal_time(); } /// external interface bool SystemQueue::QueueThread::isRunning() { return running; } void SystemQueue::QueueThread::cancel() { logging_debug("/// Cancelling system queue... "); // SYNCHRONIZED { scoped_lock lock(queue_mutex); aborted = true; } logging_debug("/// SysQ: " << immediateEventsQ.size() << " immediate event(s) + " << timedEventsQ.size() << " timed event(s) left."); system_queue_idle.notify_all(); } void SystemQueue::QueueThread::insert( SystemEvent& event, uint32_t delay ) { event.scheduledTime = get_clock(); // SYNCHRONIZED { scoped_lock lock( queue_mutex ); // BRANCH: immediate event if ( delay == 0 ) { immediateEventsQ.push_back(event); } // BRANCH: timed event else { event.deadline = event.scheduledTime + boost::posix_time::milliseconds(delay); event.delayTime = delay; // ( I think this is no longer needed.. ) // // debug output // logging_debug("/// inserting timed event, due at: " << event.deadline << " (in " << delay << " ms)"); timedEventsQ.push(event); } } // wake SysQ thread system_queue_idle.notify_one(); // NOTE: there is only one thread // (so it doesn't matter whether to call notify_one, or notify_all) } bool SystemQueue::QueueThread::isEmpty() { // SYNCHRONIZED scoped_lock lock( queue_mutex ); return immediateEventsQ.empty() && timedEventsQ.empty() && ! processing_event; } // FIXME void SystemQueue::enterMethod() { assert( false ); } void SystemQueue::leaveMethod() { assert( false ); } // XXX code from old system queue // void SystemQueue::QueueThread::enter(){ // queueMutex.lock(); // } // // void SystemQueue::QueueThread::leave(){ // queueMutex.unlock(); // } }} // spovnet, common