00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "SystemQueue.h"
00039
00040 namespace ariba {
00041 namespace utility {
00042
00043 use_logging_cpp(SystemQueue);
00044
00045 SystemQueue::SystemQueue()
00046 #ifndef UNDERLAY_OMNET
00047 : delayScheduler( &directScheduler ), systemQueueRunning( false )
00048 #endif
00049 {
00050 }
00051
00052 SystemQueue::~SystemQueue() {
00053 }
00054
00055 void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) {
00056 #ifndef UNDERLAY_OMNET
00057 if( delay == 0 ) directScheduler.insert( event, delay );
00058 else delayScheduler.insert ( event, delay );
00059 #else
00060 Enter_Method_Silent();
00061 cMessage* msg = new cMessage();
00062 msg->setContextPointer( new SystemEvent(event) );
00063
00064 if( delay == 0 )
00065 cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg );
00066 else
00067 cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg );
00068 #endif
00069 }
00070
00071 #ifdef UNDERLAY_OMNET
00072 void SystemQueue::handleMessage(cMessage* msg){
00073 SystemEvent* event = (SystemEvent*)msg->contextPointer();
00074
00075 event->listener->handleSystemEvent( *event );
00076
00077 delete event; delete msg;
00078 }
00079 #endif
00080
00081 void SystemQueue::run() {
00082 #ifndef UNDERLAY_OMNET
00083 systemQueueRunning = true;
00084 directScheduler.run();
00085 delayScheduler.run();
00086 #endif
00087 }
00088
00089 void SystemQueue::cancel() {
00090 #ifndef UNDERLAY_OMNET
00091 systemQueueRunning = false;
00092 directScheduler.cancel();
00093 delayScheduler.cancel();
00094 #endif
00095 }
00096
00097 bool SystemQueue::isEmpty() {
00098 #ifndef UNDERLAY_OMNET
00099 return directScheduler.isEmpty() || delayScheduler.isEmpty();
00100 #else
00101 return false;
00102 #endif
00103 }
00104
00105 bool SystemQueue::isRunning() {
00106 #ifndef UNDERLAY_OMNET
00107 return systemQueueRunning;
00108 #else
00109 return true;
00110 #endif
00111 }
00112
00113
00114 #ifndef UNDERLAY_OMNET
00115
00116 SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue)
00117 : running( false ), transferQueue( _transferQueue ) {
00118 }
00119
00120 SystemQueue::QueueThread::~QueueThread(){
00121 }
00122
00123 void SystemQueue::QueueThread::run(){
00124 running = true;
00125 queueThread = new boost::thread(
00126 boost::bind(&QueueThread::threadFunc, this) );
00127 }
00128
00129 void SystemQueue::QueueThread::cancel(){
00130
00131
00132 running = false;
00133 itemsAvailable.notify_all();
00134
00135
00136
00137
00138 queueThread->join();
00139 delete queueThread;
00140 queueThread = NULL;
00141
00142 while( eventsQueue.size() > 0 ){
00143 eventsQueue.erase( eventsQueue.begin() );
00144 }
00145 }
00146
00147 bool SystemQueue::QueueThread::isEmpty(){
00148 boost::mutex::scoped_lock lock( queueMutex );
00149 return eventsQueue.empty();
00150 }
00151
00152 void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
00153 {
00154 boost::mutex::scoped_lock lock( queueMutex );
00155
00156 eventsQueue.push_back( event );
00157 eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
00158 eventsQueue.back().delayTime = delay;
00159 eventsQueue.back().remainingDelay = delay;
00160
00161 onItemInserted( event );
00162 itemsAvailable.notify_all();
00163 }
00164 }
00165
00166 void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
00167
00168 boost::mutex::scoped_lock lock( obj->queueMutex );
00169
00170 while( obj->running ) {
00171
00172
00173
00174
00175
00176
00177 while ( obj->eventsQueue.empty() && obj->running ){
00178
00179 const boost::system_time duration =
00180 boost::get_system_time() +
00181 boost::posix_time::milliseconds(40);
00182
00183 obj->itemsAvailable.wait( lock );
00184
00185 }
00186
00187
00188
00189 while( !obj->eventsQueue.empty() ) {
00190
00191
00192
00193 SystemEvent ev = obj->eventsQueue.front();
00194 obj->eventsQueue.erase( obj->eventsQueue.begin() );
00195
00196 {
00197
00198
00199
00200 obj->onNextQueueItem( ev );
00201
00202 }
00203
00204 }
00205 }
00206 }
00207
00208
00209
00210 SystemQueue::QueueThreadDirect::QueueThreadDirect(){
00211 }
00212
00213 SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
00214 }
00215
00216 void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
00217
00218 }
00219
00220 void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
00221
00222 event.getListener()->handleSystemEvent( event );
00223 }
00224
00225
00226
00227 SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
00228 : QueueThread( _transferQueue ), isSleeping( false ) {
00229
00230 assert( _transferQueue != NULL );
00231 }
00232
00233 SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
00234 }
00235
00236 void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
00237
00238 if( !isSleeping) return;
00239
00240
00241
00242
00243
00244 assert( !eventsQueue.empty());
00245 sleepCond.notify_all();
00246
00247 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
00248 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
00249 uint32_t sleptTime = duration.total_milliseconds();
00250
00251 EventQueue::iterator i = eventsQueue.begin();
00252 EventQueue::iterator iend = eventsQueue.end();
00253
00254 for( ; i != iend; i++ ) {
00255
00256 if( sleptTime >= i->remainingDelay)
00257 i->remainingDelay = 0;
00258 else
00259 i->remainingDelay -= sleptTime;
00260
00261 }
00262
00263
00264
00265
00266
00267
00268 std::sort( eventsQueue.begin(), eventsQueue.end() );
00269
00270 }
00271
00272 void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
00273
00274
00275
00276
00277
00278 assert( !isSleeping );
00279
00280
00281
00282
00283
00284 if( event.remainingDelay > 0 ) {
00285
00286 const boost::system_time duration =
00287 boost::get_system_time() +
00288 boost::posix_time::milliseconds(event.remainingDelay);
00289
00290 {
00291 boost::unique_lock<boost::mutex> lock( sleepMutex );
00292
00293 sleepStart = boost::posix_time::microsec_clock::local_time();
00294 isSleeping = true;
00295
00296 sleepCond.timed_wait( lock, duration );
00297
00298 isSleeping = false;
00299 }
00300
00301 }
00302
00303
00304
00305
00306
00307 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
00308 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
00309 uint32_t sleptTime = duration.total_milliseconds();
00310
00311 if (event.remainingDelay <= sleptTime)
00312 transferQueue->insert( event, 0 );
00313 }
00314
00315 #endif // #ifndef UNDERLAY_OMNET
00316
00317
00318
00319 }}