00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00045 #ifndef PROTLIB__THREADS_H
00046 #define PROTLIB__THREADS_H
00047
00048 #include <pthread.h>
00049 #include <signal.h>
00050 #include <sys/times.h>
00051 #include <string>
00052
00053 #include "protlib_types.h"
00054 #include "logfile.h"
00055 #include "fqueue.h"
00056
00057 namespace protlib {
00058 using namespace log;
00059
00070 template <class T> void *thread_starter(void *thread_object) {
00071
00072 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00073 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
00074
00075 (static_cast<T*>(thread_object))->start_processing();
00076 return NULL;
00077 }
00078
00079
00086 class ThreadParam {
00087 public:
00088 ThreadParam();
00089 ThreadParam(uint32 wait, const char* name,
00090 uint32 minc=1, uint32 maxc=(uint32)-1);
00091
00092 static uint32 default_sleep_time;
00094 const uint32 sleep_time;
00095 const std::string name;
00097 const uint32 min_count;
00099 const uint32 max_count;
00100 };
00101
00102
00106 class ThreadError : public ProtLibException {
00107 public:
00108 enum error_t {
00109 ERROR_THREAD_CREATION, ERROR_RUNNING, ERROR_STOPPING,
00110 ERROR_ABORTING, ERROR_STILL_RUNNING, ERROR_UNINITIALIZED,
00111 ERROR_INTERNAL, ERROR_NOT_STARTED
00112 };
00113
00114 ThreadError(error_t e) : err(e) { }
00115 virtual ~ThreadError() throw () { }
00116
00117 virtual const char* getstr() const;
00118 virtual const char *what() const throw() { return getstr(); }
00119 const error_t err;
00120
00121 protected:
00122 static const char* const errstr[];
00123 };
00124
00125
00136 class Thread {
00137 public:
00138 Thread(const ThreadParam& p,
00139 bool create_queue=true, bool exp_allow=true);
00140 virtual ~Thread();
00141
00142 void *start_processing();
00143 void stop_processing(bool do_lock=true);
00144 void abort_processing(bool do_lock=true);
00145
00146 bool is_running(bool do_lock=true);
00147
00148 virtual void main_loop(uint32 thread_num) = 0;
00149
00150 void lock();
00151 void unlock();
00152
00153 void signal_cond();
00154 void broadcast_cond();
00155 void wait_cond();
00156 int wait_cond(const struct timespec& ts);
00157 int wait_cond(int32 sec, int32 nsec=0);
00158
00166 enum state_t {
00167 STATE_INIT, STATE_RUN, STATE_STOP, STATE_ABORT
00168 };
00169
00170 state_t get_state(bool do_lock=true);
00171 FastQueue* get_fqueue() { return fq; }
00172
00173 static void get_time_of_day(struct timespec& ts);
00174
00175 private:
00177 uint32 running_threads;
00178
00180 uint32 started_threads;
00181
00188 pthread_mutex_t mutex;
00189
00191 pthread_cond_t cond;
00192
00194 state_t state;
00195
00197 const ThreadParam tparam;
00198
00200 FastQueue* fq;
00201
00202 void inc_running_threads();
00203 void dec_running_threads();
00204 uint32 get_running_threads() const;
00205 void inc_started_threads();
00206 uint32 get_started_threads() const;
00207 };
00208
00209
00210 inline void Thread::lock() {
00211 if ( pthread_mutex_lock(&mutex) != 0 ) {
00212 ERRLog(tparam.name, "Error while locking mutex");
00213 }
00214 }
00215
00216 inline void Thread::unlock() {
00217 int ret = pthread_mutex_unlock(&mutex);
00218 assert( ret == 0 );
00219 }
00220
00221 inline void Thread::signal_cond() {
00222 pthread_cond_signal(&cond);
00223 }
00224
00225 inline void Thread::broadcast_cond() {
00226 pthread_cond_broadcast(&cond);
00227 }
00228
00229 inline void Thread::wait_cond() {
00230 pthread_cond_wait(&cond,&mutex);
00231 }
00232
00233
00238 inline int Thread::wait_cond(const struct timespec& ts) {
00239 return pthread_cond_timedwait(&cond, &mutex, &ts);
00240 }
00241
00242
00243 inline void Thread::inc_running_threads() {
00244 running_threads++;
00245 }
00246
00247 inline void Thread::dec_running_threads() {
00248 assert( running_threads > 0 );
00249 running_threads--;
00250 }
00251
00252 inline uint32 Thread::get_running_threads() const {
00253 return running_threads;
00254 }
00255
00256 inline void Thread::inc_started_threads() {
00257 started_threads++;
00258 }
00259
00260 inline uint32 Thread::get_started_threads() const {
00261 return started_threads;
00262 }
00263
00264
00271 template <class T, class TParam> class ThreadStarter {
00272 public:
00273 ThreadStarter(uint32 count, const TParam& param);
00274 ~ThreadStarter();
00275
00276 void start_processing();
00277 void stop_processing();
00278 bool sleepuntilstop();
00279 void wait_until_stopped();
00280 void abort_processing(bool kill=false);
00281
00283 inline T *get_thread_object() { return &thread_object; }
00284
00286 inline bool is_running() const { return thread_object.is_running(); }
00287
00288 private:
00290 T thread_object;
00291
00293 const TParam thread_param;
00294
00296 std::vector<pthread_t> pthreads;
00297 };
00298
00299
00306 template <class T, class TParam>
00307 ThreadStarter<T, TParam>::ThreadStarter(uint32 count, const TParam& param)
00308 : thread_object(param), thread_param(param), pthreads(count) {
00309
00310
00311 if ( count < param.min_count )
00312 count = param.min_count;
00313 else if ( count > param.max_count )
00314 count = param.max_count;
00315
00316 assert( count >= param.min_count && count <= param.max_count );
00317
00318 pthreads.resize(count);
00319 }
00320
00321
00327 template <class T, class TParam> ThreadStarter<T, TParam>::~ThreadStarter() {
00328
00329 if ( thread_object.is_running() ) {
00330 catch_all(stop_processing());
00331 sleepuntilstop();
00332 catch_all(abort_processing(true));
00333 }
00334 }
00335
00336
00340 template <class T, class TParam>
00341 void ThreadStarter<T, TParam>::start_processing() {
00342
00343 thread_object.lock();
00344
00345
00346
00347
00348
00349 if ( thread_object.is_running(false) ) {
00350 thread_object.unlock();
00351
00352 ERRLog("Threads", "start_processing(): " << thread_param.name
00353 << " is already running");
00354
00355 throw ThreadError(ThreadError::ERROR_INTERNAL);
00356 }
00357
00358
00359
00360
00361
00362 int res;
00363 pthread_attr_t attr;
00364 pthread_attr_init(&attr);
00365 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00366
00367 for (unsigned i = 0; i < pthreads.size(); i++) {
00368
00369 res = pthread_create(&pthreads[i], &attr,
00370 &thread_starter<T>, &thread_object);
00371
00372 if ( res != 0 ) {
00373 thread_object.unlock();
00374 ERRLog("Threads", "pthread_create() failed starting a "
00375 << "thread for " << thread_param.name);
00376
00377 throw ThreadError(ThreadError::ERROR_THREAD_CREATION);
00378 }
00379 }
00380
00381 ILog("Threads", pthreads.size() << " " << thread_param.name
00382 << " thread(s) sucessfully created");
00383
00384 pthread_attr_destroy(&attr);
00385
00386 thread_object.unlock();
00387 }
00388
00389
00393 template <class T, class TParam>
00394 void ThreadStarter<T, TParam>::stop_processing() {
00395
00396 thread_object.lock();
00397
00398 typename T::state_t state = thread_object.get_state(false);
00399
00400 switch (state) {
00401
00402 case T::STATE_INIT:
00403 thread_object.unlock();
00404 DLog("Threads", "Thread " << thread_param.name
00405 << " has not been started yet.");
00406 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
00407 break;
00408
00409 case T::STATE_RUN:
00410 thread_object.stop_processing(false);
00411 thread_object.unlock();
00412
00413 ILog("Threads", "Thread(s) "
00414 << thread_param.name << " asked to stop");
00415 break;
00416
00417 case T::STATE_STOP:
00418 thread_object.unlock();
00419 DLog("Threads", "Thread(s) "
00420 << thread_param.name << " is already in state stop.");
00421 throw ThreadError(ThreadError::ERROR_STOPPING);
00422 break;
00423
00424 case T::STATE_ABORT:
00425
00426 DLog("Threads", "Thread "
00427 << thread_param.name << " is in state abort.");
00428
00429 break;
00430
00431 default:
00432 assert( false );
00433
00434 }
00435 }
00436
00437
00452 template <class T, class TParam>
00453 bool ThreadStarter<T, TParam>::sleepuntilstop() {
00454
00455 for (uint32 i = 0; thread_object.is_running()
00456 && i < thread_param.sleep_time; i++)
00457 sleep(1);
00458
00459 return ( thread_object.is_running() ? false : true );
00460 }
00461
00462
00469 template <class T, class TParam>
00470 void ThreadStarter<T, TParam>::wait_until_stopped() {
00471
00472 DLog("Threads",
00473 "Waiting for Thread " << thread_param.name << " to stop");
00474
00475 Thread::state_t state = thread_object.get_state(false);
00476
00477 while ( state == Thread::STATE_INIT || thread_object.is_running() ) {
00478 sleep(1);
00479 state = thread_object.get_state(false);
00480 }
00481
00482 DLog("Threads", "Thread " << thread_param.name << " has stopped");
00483 }
00484
00485
00491 template <class T, class TParam>
00492 void ThreadStarter<T, TParam>::abort_processing(bool kill) {
00493
00494 thread_object.lock();
00495
00496 switch ( thread_object.get_state(false) ) {
00497
00498 case T::STATE_INIT:
00499 thread_object.unlock();
00500 DLog("Threads", "Thread "
00501 << thread_param.name << " has not been started yet.");
00502 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
00503 break;
00504
00505 case T::STATE_ABORT:
00506 if ( ! kill ) {
00507
00508 DLog("Threads", "Thread " << thread_param.name
00509 << " is already in state abort.");
00510
00511
00512 }
00513 break;
00514
00515 default:
00516 break;
00517 }
00518
00519 if ( thread_object.is_running(false) ) {
00520 thread_object.stop_processing(false);
00521
00522 thread_object.unlock();
00523 sleepuntilstop();
00524 thread_object.lock();
00525 }
00526
00527 thread_object.abort_processing(false);
00528
00529
00530 thread_object.unlock();
00531 sleepuntilstop();
00532 thread_object.lock();
00533
00534 if ( thread_object.is_running(false) ) {
00535
00536 thread_object.unlock();
00537 if (kill) {
00538 for (unsigned i = 0; i < pthreads.size(); i++)
00539 pthread_cancel( pthreads[i] );
00540
00541 sleepuntilstop();
00542
00543 for (unsigned i = 0; i < pthreads.size(); i++)
00544 pthread_kill(pthreads[i], 9);
00545
00546 ILog("Threads", pthreads.size() << " thread(s) "
00547 << thread_param.name << " killed");
00548 } else {
00549 ILog("Threads", pthreads.size() << " thread(s) "
00550 << thread_param.name << " refused to abort");
00551
00552 throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
00553 }
00554
00555 } else {
00556 thread_object.unlock();
00557 ILog("Threads", pthreads.size() << " thread(s) "
00558 << thread_param.name << " have terminated");
00559 }
00560 }
00561
00562
00564
00565 }
00566
00567 #endif // PROTLIB__THREADS_H