00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00045 #ifndef PROTLIB__THREADS_H
00046 #define PROTLIB__THREADS_H
00047
00048 #include <vector>
00049 #include <pthread.h>
00050 #include <signal.h>
00051 #include <sys/times.h>
00052 #include <string>
00053
00054 #include "protlib_types.h"
00055 #include "logfile.h"
00056 #include "fqueue.h"
00057
00058 namespace protlib {
00059 using namespace log;
00060
00071 template <class T> void *thread_starter(void *thread_object) {
00072
00073 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00074 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
00075
00076 (static_cast<T*>(thread_object))->start_processing();
00077 return NULL;
00078 }
00079
00080
00087 class ThreadParam {
00088 public:
00089 ThreadParam();
00090 ThreadParam(uint32 wait, const char* name,
00091 uint32 minc=1, uint32 maxc=(uint32)-1);
00092
00093 static uint32 default_sleep_time;
00095 const uint32 sleep_time;
00096 const std::string name;
00098 const uint32 min_count;
00100 const uint32 max_count;
00101 };
00102
00103
00107 class ThreadError : public ProtLibException {
00108 public:
00109 enum error_t {
00110 ERROR_THREAD_CREATION, ERROR_RUNNING, ERROR_STOPPING,
00111 ERROR_ABORTING, ERROR_STILL_RUNNING, ERROR_UNINITIALIZED,
00112 ERROR_INTERNAL, ERROR_NOT_STARTED
00113 };
00114
00115 ThreadError(error_t e) : err(e) { }
00116 virtual ~ThreadError() throw () { }
00117
00118 virtual const char* getstr() const;
00119 virtual const char *what() const throw() { return getstr(); }
00120 const error_t err;
00121
00122 protected:
00123 static const char* const errstr[];
00124 };
00125
00126
00137 class Thread {
00138 public:
00139 Thread(const ThreadParam& p,
00140 bool create_queue=true, bool exp_allow=true);
00141 virtual ~Thread();
00142
00143 void *start_processing();
00144 void stop_processing(bool do_lock=true);
00145 void abort_processing(bool do_lock=true);
00146
00147 bool is_running(bool do_lock=true);
00148
00149 virtual void main_loop(uint32 thread_num) = 0;
00150
00151 void lock();
00152 void unlock();
00153
00154 void signal_cond();
00155 void broadcast_cond();
00156 void wait_cond();
00157 int wait_cond(const struct timespec& ts);
00158 int wait_cond(int32 sec, int32 nsec=0);
00159
00167 enum state_t {
00168 STATE_INIT, STATE_RUN, STATE_STOP, STATE_ABORT
00169 };
00170
00171 state_t get_state(bool do_lock=true);
00172 FastQueue* get_fqueue() { return fq; }
00173
00174 static void get_time_of_day(struct timespec& ts);
00175
00176 private:
00178 uint32 running_threads;
00179
00181 uint32 started_threads;
00182
00189 pthread_mutex_t mutex;
00190
00192 pthread_cond_t cond;
00193
00195 state_t state;
00196
00198 const ThreadParam tparam;
00199
00201 FastQueue* fq;
00202
00203 void inc_running_threads();
00204 void dec_running_threads();
00205 uint32 get_running_threads() const;
00206 void inc_started_threads();
00207 uint32 get_started_threads() const;
00208 };
00209
00210
00211 inline void Thread::lock() {
00212 if ( pthread_mutex_lock(&mutex) != 0 ) {
00213 ERRLog(tparam.name, "Error while locking mutex");
00214 }
00215 }
00216
00217 inline void Thread::unlock() {
00218 int ret = pthread_mutex_unlock(&mutex);
00219 assert( ret == 0 );
00220 }
00221
00222 inline void Thread::signal_cond() {
00223 pthread_cond_signal(&cond);
00224 }
00225
00226 inline void Thread::broadcast_cond() {
00227 pthread_cond_broadcast(&cond);
00228 }
00229
00230 inline void Thread::wait_cond() {
00231 pthread_cond_wait(&cond,&mutex);
00232 }
00233
00234
00239 inline int Thread::wait_cond(const struct timespec& ts) {
00240 return pthread_cond_timedwait(&cond, &mutex, &ts);
00241 }
00242
00243
00244 inline void Thread::inc_running_threads() {
00245 running_threads++;
00246 }
00247
00248 inline void Thread::dec_running_threads() {
00249 assert( running_threads > 0 );
00250 running_threads--;
00251 }
00252
00253 inline uint32 Thread::get_running_threads() const {
00254 return running_threads;
00255 }
00256
00257 inline void Thread::inc_started_threads() {
00258 started_threads++;
00259 }
00260
00261 inline uint32 Thread::get_started_threads() const {
00262 return started_threads;
00263 }
00264
00265
00272 template <class T, class TParam> class ThreadStarter {
00273 public:
00274 ThreadStarter(uint32 count, const TParam& param);
00275 ~ThreadStarter();
00276
00277 void start_processing();
00278 void stop_processing();
00279 bool sleepuntilstop();
00280 void wait_until_stopped();
00281 void abort_processing(bool kill=false);
00282
00284 inline T *get_thread_object() { return &thread_object; }
00285
00287 inline bool is_running() const { return thread_object.is_running(); }
00288
00289 private:
00291 T thread_object;
00292
00294 const TParam thread_param;
00295
00297 std::vector<pthread_t> pthreads;
00298 };
00299
00300
00307 template <class T, class TParam>
00308 ThreadStarter<T, TParam>::ThreadStarter(uint32 count, const TParam& param)
00309 : thread_object(param), thread_param(param), pthreads(count) {
00310
00311
00312 if ( count < param.min_count )
00313 count = param.min_count;
00314 else if ( count > param.max_count )
00315 count = param.max_count;
00316
00317 assert( count >= param.min_count && count <= param.max_count );
00318
00319 pthreads.resize(count);
00320 }
00321
00322
00328 template <class T, class TParam> ThreadStarter<T, TParam>::~ThreadStarter() {
00329
00330 if ( thread_object.is_running() ) {
00331 catch_all(stop_processing());
00332 sleepuntilstop();
00333 catch_all(abort_processing(true));
00334 }
00335 }
00336
00337
00341 template <class T, class TParam>
00342 void ThreadStarter<T, TParam>::start_processing() {
00343
00344 thread_object.lock();
00345
00346
00347
00348
00349
00350 if ( thread_object.is_running(false) ) {
00351 thread_object.unlock();
00352
00353 ERRLog("Threads", "start_processing(): " << thread_param.name
00354 << " is already running");
00355
00356 throw ThreadError(ThreadError::ERROR_INTERNAL);
00357 }
00358
00359
00360
00361
00362
00363 int res;
00364 pthread_attr_t attr;
00365 pthread_attr_init(&attr);
00366 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00367
00368 for (unsigned i = 0; i < pthreads.size(); i++) {
00369
00370 res = pthread_create(&pthreads[i], &attr,
00371 &thread_starter<T>, &thread_object);
00372
00373 if ( res != 0 ) {
00374 thread_object.unlock();
00375 ERRLog("Threads", "pthread_create() failed starting a "
00376 << "thread for " << thread_param.name);
00377
00378 throw ThreadError(ThreadError::ERROR_THREAD_CREATION);
00379 }
00380 }
00381
00382 ILog("Threads", pthreads.size() << " " << thread_param.name
00383 << " thread(s) sucessfully created");
00384
00385 pthread_attr_destroy(&attr);
00386
00387 thread_object.unlock();
00388 }
00389
00390
00394 template <class T, class TParam>
00395 void ThreadStarter<T, TParam>::stop_processing() {
00396
00397 thread_object.lock();
00398
00399 typename T::state_t state = thread_object.get_state(false);
00400
00401 switch (state) {
00402
00403 case T::STATE_INIT:
00404 thread_object.unlock();
00405 DLog("Threads", "Thread " << thread_param.name
00406 << " has not been started yet.");
00407 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
00408 break;
00409
00410 case T::STATE_RUN:
00411 thread_object.stop_processing(false);
00412 thread_object.unlock();
00413
00414 ILog("Threads", "Thread(s) "
00415 << thread_param.name << " asked to stop");
00416 break;
00417
00418 case T::STATE_STOP:
00419 thread_object.unlock();
00420 DLog("Threads", "Thread(s) "
00421 << thread_param.name << " is already in state stop.");
00422 throw ThreadError(ThreadError::ERROR_STOPPING);
00423 break;
00424
00425 case T::STATE_ABORT:
00426
00427 DLog("Threads", "Thread "
00428 << thread_param.name << " is in state abort.");
00429
00430 break;
00431
00432 default:
00433 assert( false );
00434
00435 }
00436 }
00437
00438
00453 template <class T, class TParam>
00454 bool ThreadStarter<T, TParam>::sleepuntilstop() {
00455
00456 for (uint32 i = 0; thread_object.is_running()
00457 && i < thread_param.sleep_time; i++)
00458 sleep(1);
00459
00460 return ( thread_object.is_running() ? false : true );
00461 }
00462
00463
00470 template <class T, class TParam>
00471 void ThreadStarter<T, TParam>::wait_until_stopped() {
00472
00473 DLog("Threads",
00474 "Waiting for Thread " << thread_param.name << " to stop");
00475
00476 Thread::state_t state = thread_object.get_state(false);
00477
00478 while ( state == Thread::STATE_INIT || thread_object.is_running() ) {
00479 sleep(1);
00480 state = thread_object.get_state(false);
00481 }
00482
00483 DLog("Threads", "Thread " << thread_param.name << " has stopped");
00484 }
00485
00486
00492 template <class T, class TParam>
00493 void ThreadStarter<T, TParam>::abort_processing(bool kill) {
00494
00495 thread_object.lock();
00496
00497 switch ( thread_object.get_state(false) ) {
00498
00499 case T::STATE_INIT:
00500 thread_object.unlock();
00501 DLog("Threads", "Thread "
00502 << thread_param.name << " has not been started yet.");
00503 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
00504 break;
00505
00506 case T::STATE_ABORT:
00507 if ( ! kill ) {
00508
00509 DLog("Threads", "Thread " << thread_param.name
00510 << " is already in state abort.");
00511
00512
00513 }
00514 break;
00515
00516 default:
00517 break;
00518 }
00519
00520 if ( thread_object.is_running(false) ) {
00521 thread_object.stop_processing(false);
00522
00523 thread_object.unlock();
00524 sleepuntilstop();
00525 thread_object.lock();
00526 }
00527
00528 thread_object.abort_processing(false);
00529
00530
00531 thread_object.unlock();
00532 sleepuntilstop();
00533 thread_object.lock();
00534
00535 if ( thread_object.is_running(false) ) {
00536
00537 thread_object.unlock();
00538 if (kill) {
00539 for (unsigned i = 0; i < pthreads.size(); i++)
00540 pthread_cancel( pthreads[i] );
00541
00542 sleepuntilstop();
00543
00544 for (unsigned i = 0; i < pthreads.size(); i++)
00545 pthread_kill(pthreads[i], 9);
00546
00547 ILog("Threads", pthreads.size() << " thread(s) "
00548 << thread_param.name << " killed");
00549 } else {
00550 ILog("Threads", pthreads.size() << " thread(s) "
00551 << thread_param.name << " refused to abort");
00552
00553 throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
00554 }
00555
00556 } else {
00557 thread_object.unlock();
00558 ILog("Threads", pthreads.size() << " thread(s) "
00559 << thread_param.name << " have terminated");
00560 }
00561 }
00562
00563
00565
00566 }
00567
00568 #endif // PROTLIB__THREADS_H