00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <sys/time.h>
00031
00032 #include "threads.h"
00033
00034
00035 namespace protlib {
00036 using namespace log;
00037
00046 uint32 ThreadParam::default_sleep_time = 5;
00047
00048
00059 ThreadParam::ThreadParam(uint32 wait, const char *n, uint32 minc, uint32 maxc)
00060 : sleep_time(wait), name(n ? n : "UNKNOWN"),
00061 min_count(minc), max_count(maxc) {
00062
00063 assert( minc > 0 );
00064 assert( maxc >= minc );
00065 }
00066
00067
00075 Thread::Thread(const ThreadParam &p, bool create_queue, bool exp_allow)
00076 : running_threads(0), started_threads(0), state(STATE_INIT), tparam(p),
00077 fq(create_queue ? new FastQueue(p.name.c_str(), exp_allow) : 0) {
00078
00079 pthread_mutexattr_t mutex_attr;
00080
00081 pthread_mutexattr_init(&mutex_attr);
00082
00083 #ifdef _DEBUG
00084 pthread_mutexattr_settype(&mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
00085 #else
00086 pthread_mutexattr_settype(&mutex_attr,PTHREAD_MUTEX_NORMAL);
00087 #endif
00088
00089 pthread_mutex_init(&mutex, &mutex_attr);
00090 pthread_cond_init(&cond,NULL);
00091
00092 pthread_mutexattr_destroy(&mutex_attr);
00093 }
00094
00095
00101 Thread::~Thread() {
00102 if ( get_running_threads() )
00103 throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
00104
00105 delete fq;
00106
00107 pthread_cond_destroy(&cond);
00108 pthread_mutex_destroy(&mutex);
00109 }
00110
00111
00120 void *Thread::start_processing() {
00121
00122 lock();
00123
00124 switch (state) {
00125 case STATE_INIT:
00126 state=STATE_RUN;
00127 break;
00128 case STATE_RUN:
00129 break;
00130 case STATE_STOP:
00131 case STATE_ABORT:
00132 unlock();
00133 return NULL;
00134 }
00135
00136 inc_running_threads();
00137 inc_started_threads();
00138
00139 int thread_num = get_started_threads();
00140
00141 unlock();
00142
00143
00144
00145
00146
00147
00148
00149
00150 try {
00151 main_loop(thread_num);
00152 }
00153 catch ( ProtLibException &e ) {
00154 ERRLog("Threads", "Unhandled ProtLibException in thread "
00155 << tparam.name << ", num " << thread_num << ", error ["
00156 << e.getstr() << ']');
00157 }
00158 catch ( bad_alloc & ) {
00159 ERRLog("Threads", tparam.name << ", num " << thread_num
00160 << ": [out of memory]");
00161 }
00162 catch ( ... ) {
00163 ERRLog("Threads", "Unhandled non-ProtLibException in thread "
00164 << tparam.name << ", num " << thread_num);
00165 }
00166
00167 lock();
00168 dec_running_threads();
00169 unlock();
00170
00171 return NULL;
00172 }
00173
00174
00183 void Thread::stop_processing(bool do_lock) {
00184 if ( do_lock )
00185 lock();
00186
00187 if (state==STATE_RUN) {
00188 state = STATE_STOP;
00189 signal_cond();
00190 }
00191
00192 if ( do_lock )
00193 unlock();
00194 }
00195
00196
00202 void Thread::abort_processing(bool do_lock) {
00203 if ( do_lock )
00204 lock();
00205
00206 if ( state == STATE_RUN || state == STATE_STOP ) {
00207 state = STATE_ABORT;
00208 signal_cond();
00209 }
00210
00211 if ( do_lock )
00212 unlock();
00213 }
00214
00215
00221 bool Thread::is_running(bool do_lock) {
00222
00223 if ( do_lock )
00224 lock();
00225
00226 bool res = ( get_running_threads() > 0 );
00227
00228 if ( do_lock )
00229 unlock();
00230
00231 return res;
00232 }
00233
00234
00242 int Thread::wait_cond(int32 sec, int32 nsec) {
00243 struct timeval tv;
00244 struct timespec ts;
00245
00246 if ( sec < 0 )
00247 sec = 0;
00248 if ( nsec < 0 )
00249 nsec = 0;
00250
00251 gettimeofday(&tv, NULL);
00252 ts.tv_sec = tv.tv_sec+sec;
00253 ts.tv_nsec = tv.tv_usec*1000+nsec;
00254
00255
00256 while ( ts.tv_nsec > 1000000000) {
00257 ts.tv_sec++;
00258 ts.tv_nsec -= 1000000000;
00259 }
00260
00261 if ( ts.tv_sec < 0 )
00262 ts.tv_sec = 0;
00263 if ( ts.tv_nsec < 0 )
00264 ts.tv_nsec = 0;
00265
00266 return pthread_cond_timedwait(&cond, &mutex, &ts);
00267 }
00268
00269
00278 Thread::state_t Thread::get_state(bool do_lock) {
00279 if ( do_lock )
00280 lock();
00281
00282 state_t s = state;
00283
00284 if ( do_lock )
00285 unlock();
00286
00287 return s;
00288 }
00289
00291 void Thread::get_time_of_day(struct timespec& ts) {
00292 struct timeval tv;
00293 gettimeofday(&tv,NULL);
00294 ts.tv_sec = tv.tv_sec;
00295 ts.tv_nsec = tv.tv_usec*1000;
00296 }
00297
00298 const char* ThreadError::getstr() const {
00299 return errstr[(int)err];
00300 }
00301
00302 const char* const ThreadError::errstr[] = {
00303 "Cannot create POSIX Threads.",
00304 "Thread is running.",
00305 "Thread is going to stop.",
00306 "Thread is aborting.",
00307 "Still running threads left.",
00308 "ThreadStarter is not initialized correctly."
00309 "Internal ThreadStarter or Thread error.",
00310 "Thread has not been started yet."
00311 };
00312
00314
00315 }