source: source/ariba/utility/transport/tcpip/protlib/threads.h@ 9323

Last change on this file since 9323 was 6922, checked in by mies, 15 years ago

replaced deprecated hash_map, hash_set classes with boost
added unsigned serialization
fixed more warnings
included -Wall to Makefile.am

File size: 13.7 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file threads.h
3/// Thread support functions (classes thread and threadstarter) based on POSIX threads
4/// ----------------------------------------------------------
5/// $Id: threads.h 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/include/threads.h $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/**
31 * Classes to support multi-threaded programming.
32 *
33 * @ingroup thread
34 *
35 * A Thread module class must inherit from Thread. Several instances may run
36 * simultaneously but they share exactly one module object. So you must take
37 * care of this fact when writing the module code and use locks accordingly.
38 *
39 * Use lock(), unlock(), wait_cond() and signal_cond() the way you would use
40 * the corresponding POSIX thread functions.
41 *
42 * Use the ThreadStarter template class to create threads.
43 */
44
45#ifndef PROTLIB__THREADS_H
46#define PROTLIB__THREADS_H
47
48#include <vector>
49#include <pthread.h>
50#include <signal.h>
51#include <sys/times.h>
52#include <string>
53
54#include "protlib_types.h"
55#include "logfile.h"
56#include "fqueue.h"
57
58namespace protlib {
59 using namespace log;
60
61/** @addtogroup thread Threads
62 * @{
63 */
64
65
66/**
67 * Call the method start_processing of a Thread instance.
68 *
69 * @param thread_object a Thread instance
70 */
71template <class T> void *thread_starter(void *thread_object) {
72
73 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
74 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
75
76 (static_cast<T*>(thread_object))->start_processing();
77 return NULL;
78}
79
80
81/**
82 * Base class for thread object parameters.
83 *
84 * This is used by ThreadStarter to extract and store overall data like the
85 * sleep time and is also accessible to the thread object.
86 */
87class ThreadParam {
88 public:
89 ThreadParam();
90 ThreadParam(uint32 wait, const char* name,
91 uint32 minc=1, uint32 maxc=(uint32)-1);
92
93 static uint32 default_sleep_time;
94 /// sleep time
95 const uint32 sleep_time;
96 const std::string name;
97 /// minimum thread count
98 const uint32 min_count;
99 /// maximum thread count
100 const uint32 max_count;
101};
102
103
104/**
105 * This exception will be thrown if there is some trouble with threading.
106 */
107class ThreadError : public ProtLibException {
108 public:
109 enum error_t {
110 ERROR_THREAD_CREATION, ERROR_RUNNING, ERROR_STOPPING,
111 ERROR_ABORTING, ERROR_STILL_RUNNING, ERROR_UNINITIALIZED,
112 ERROR_INTERNAL, ERROR_NOT_STARTED
113 };
114
115 ThreadError(error_t e) : err(e) { }
116 virtual ~ThreadError() throw () { }
117
118 virtual const char* getstr() const;
119 virtual const char *what() const throw() { return getstr(); }
120 const error_t err;
121
122 protected:
123 static const char* const errstr[];
124};
125
126
127/**
128 * Abstract interface for thread modules.
129 *
130 * Don't confuse this Thread class with POSIX threads. A Thread class only
131 * provides a main_loop method which will be executed by one or more POSIX
132 * threads simultaneously. The Thread instance provides a central point for
133 * all those POSIX threads to store data. Don't forget to lock() the Thread
134 * instance to avoid race conditions if you want to access and/or modify
135 * the data.
136 */
137class Thread {
138 public:
139 Thread(const ThreadParam& p,
140 bool create_queue=true, bool exp_allow=true);
141 virtual ~Thread();
142
143 void *start_processing();
144 void stop_processing(bool do_lock=true);
145 void abort_processing(bool do_lock=true);
146
147 bool is_running(bool do_lock=true);
148
149 virtual void main_loop(uint32 thread_num) = 0;
150
151 void lock();
152 void unlock();
153
154 void signal_cond();
155 void broadcast_cond();
156 void wait_cond();
157 int wait_cond(const struct timespec& ts);
158 int wait_cond(int32 sec, int32 nsec=0);
159
160 /**
161 * State of a thread.
162 *
163 * The state of a thread does not really tell whether there are threads
164 * active or not. It only represents a state in the life cycle of a
165 * thread object.
166 */
167 enum state_t {
168 STATE_INIT, STATE_RUN, STATE_STOP, STATE_ABORT
169 };
170
171 state_t get_state(bool do_lock=true);
172 FastQueue* get_fqueue() { return fq; }
173
174 static void get_time_of_day(struct timespec& ts);
175
176 private:
177 /// This counter records the number of threads running on this object.
178 uint32 running_threads;
179
180 /// This counter records the number of started threads.
181 uint32 started_threads;
182
183 /**
184 * Thread-global mutex.
185 *
186 * This mutex is used to lock the thread object when data common to all
187 * threads on this object is modified.
188 */
189 pthread_mutex_t mutex;
190
191 /// thread object condition
192 pthread_cond_t cond;
193
194 /// thread state
195 state_t state;
196
197 /// Thread parameters.
198 const ThreadParam tparam;
199
200 /// The input queue where threads can get messages from.
201 FastQueue* fq;
202
203 void inc_running_threads();
204 void dec_running_threads();
205 uint32 get_running_threads() const;
206 void inc_started_threads();
207 uint32 get_started_threads() const;
208};
209
210
211inline void Thread::lock() {
212 if ( pthread_mutex_lock(&mutex) != 0 ) {
213 ERRLog(tparam.name, "Error while locking mutex");
214 }
215}
216
217inline void Thread::unlock() {
218 int ret = pthread_mutex_unlock(&mutex);
219 assert( ret == 0 );
220}
221
222inline void Thread::signal_cond() {
223 pthread_cond_signal(&cond);
224}
225
226inline void Thread::broadcast_cond() {
227 pthread_cond_broadcast(&cond);
228}
229
230inline void Thread::wait_cond() {
231 pthread_cond_wait(&cond,&mutex);
232}
233
234
235/**
236 * @param ts absolute time
237 * @return 0, ETIMEDOUT or EINTR.
238 */
239inline int Thread::wait_cond(const struct timespec& ts) {
240 return pthread_cond_timedwait(&cond, &mutex, &ts);
241}
242
243
244inline void Thread::inc_running_threads() {
245 running_threads++;
246}
247
248inline void Thread::dec_running_threads() {
249 assert( running_threads > 0 );
250 running_threads--;
251}
252
253inline uint32 Thread::get_running_threads() const {
254 return running_threads;
255}
256
257inline void Thread::inc_started_threads() {
258 started_threads++;
259}
260
261inline uint32 Thread::get_started_threads() const {
262 return started_threads;
263}
264
265
266/**
267 * A template class used to start threads.
268 *
269 * Note that the ThreadStarter template class is not thread-safe yet, so it
270 * may only be accessed by one thread at a time.
271 */
272template <class T, class TParam> class ThreadStarter {
273 public:
274 ThreadStarter(uint32 count, const TParam& param);
275 ~ThreadStarter();
276
277 void start_processing();
278 void stop_processing();
279 bool sleepuntilstop(); // deprecated!
280 void wait_until_stopped();
281 void abort_processing(bool kill=false);
282
283 /// get a pointer to the thread object
284 inline T *get_thread_object() { return &thread_object; }
285
286 /// Are all threads finished: TODO
287 inline bool is_running() const { return thread_object.is_running(); }
288
289 private:
290 /// The Thread object on which the threads run.
291 T thread_object;
292
293 /// For debugging, the name of the thread as given by TParam.
294 const TParam thread_param;
295
296 /// Contains the handles of all pthreads that we created.
297 std::vector<pthread_t> pthreads;
298};
299
300
301/**
302 * Constructor.
303 *
304 * @param count the number of threads to start
305 * @param param thread parameters
306 */
307template <class T, class TParam>
308ThreadStarter<T, TParam>::ThreadStarter(uint32 count, const TParam& param)
309 : thread_object(param), thread_param(param), pthreads(count) {
310
311 // TODO: fix all Thread subclasses that use an invalid count!
312 if ( count < param.min_count )
313 count = param.min_count;
314 else if ( count > param.max_count )
315 count = param.max_count;
316
317 assert( count >= param.min_count && count <= param.max_count );
318
319 pthreads.resize(count); // TODO: remove
320}
321
322
323/**
324 * Destructor.
325 *
326 * This cancels all running threads if there are still some.
327 */
328template <class T, class TParam> ThreadStarter<T, TParam>::~ThreadStarter() {
329
330 if ( thread_object.is_running() ) {
331 catch_all(stop_processing());
332 sleepuntilstop();
333 catch_all(abort_processing(true));
334 }
335}
336
337
338/**
339 * Start the threads.
340 */
341template <class T, class TParam>
342void ThreadStarter<T, TParam>::start_processing() {
343
344 thread_object.lock();
345
346 /*
347 * Check if this is a fresh Thread. If it is or was already running,
348 * we have detected a programming error.
349 */
350 if ( thread_object.is_running(false) ) {
351 thread_object.unlock();
352
353 ERRLog("Threads", "start_processing(): " << thread_param.name
354 << " is already running");
355
356 throw ThreadError(ThreadError::ERROR_INTERNAL);
357 }
358
359
360 /*
361 * Create the requested number of threads.
362 */
363 int res;
364 pthread_attr_t attr;
365 pthread_attr_init(&attr);
366 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
367
368 for (unsigned i = 0; i < pthreads.size(); i++) {
369 // Create a posix thread. It will start running immediately.
370 res = pthread_create(&pthreads[i], &attr,
371 &thread_starter<T>, &thread_object);
372
373 if ( res != 0 ) {
374 thread_object.unlock();
375 ERRLog("Threads", "pthread_create() failed starting a "
376 << "thread for " << thread_param.name);
377
378 throw ThreadError(ThreadError::ERROR_THREAD_CREATION);
379 }
380 }
381
382 ILog("Threads", pthreads.size() << " " << thread_param.name
383 << " thread(s) sucessfully created");
384
385 pthread_attr_destroy(&attr); // has no effect on the created threads
386
387 thread_object.unlock();
388}
389
390
391/**
392 * Ask all threads to stop (politely).
393 */
394template <class T, class TParam>
395void ThreadStarter<T, TParam>::stop_processing() {
396
397 thread_object.lock();
398
399 typename T::state_t state = thread_object.get_state(false);
400
401 switch (state) {
402
403 case T::STATE_INIT:
404 thread_object.unlock();
405 DLog("Threads", "Thread " << thread_param.name
406 << " has not been started yet.");
407 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
408 break;
409
410 case T::STATE_RUN:
411 thread_object.stop_processing(false);
412 thread_object.unlock();
413
414 ILog("Threads", "Thread(s) "
415 << thread_param.name << " asked to stop");
416 break;
417
418 case T::STATE_STOP:
419 thread_object.unlock();
420 DLog("Threads", "Thread(s) "
421 << thread_param.name << " is already in state stop.");
422 throw ThreadError(ThreadError::ERROR_STOPPING);
423 break;
424
425 case T::STATE_ABORT:
426 //thread_object.unlock();
427 DLog("Threads", "Thread "
428 << thread_param.name << " is in state abort.");
429 //throw ThreadError(ThreadError::ERROR_ABORTING);
430 break;
431
432 default:
433 assert( false ); // unknown state
434
435 }
436}
437
438
439/**
440 * Wait for the thread to stop running (DEPRECATED).
441 *
442 * Sleeps until all threads have stopped running but not longer than
443 * sleep_time seconds.
444 *
445 * This method is deprecated because it suffers from a race condition:
446 * If none of the pthreads created in start_processing() has been run yet,
447 * then this method returns immediately. Use wait_until_stopped() instead.
448 *
449 * @return true if the threads have stopped
450 *
451 * @see ThreadParam
452 */
453template <class T, class TParam>
454bool ThreadStarter<T, TParam>::sleepuntilstop() {
455
456 for (uint32 i = 0; thread_object.is_running()
457 && i < thread_param.sleep_time; i++)
458 sleep(1);
459
460 return ( thread_object.is_running() ? false : true );
461}
462
463
464/**
465 * Wait until all threads have stopped running.
466 *
467 * Threads that haven't been running yet (state IDLE) are not considered
468 * as stopped!
469 */
470template <class T, class TParam>
471void ThreadStarter<T, TParam>::wait_until_stopped() {
472
473 DLog("Threads",
474 "Waiting for Thread " << thread_param.name << " to stop");
475
476 Thread::state_t state = thread_object.get_state(false);
477
478 while ( state == Thread::STATE_INIT || thread_object.is_running() ) {
479 sleep(1);
480 state = thread_object.get_state(false);
481 }
482
483 DLog("Threads", "Thread " << thread_param.name << " has stopped");
484}
485
486
487/**
488 * Stop and kill the threads.
489 *
490 * @param kill kill the threads if they do not stop.
491 */
492template <class T, class TParam>
493void ThreadStarter<T, TParam>::abort_processing(bool kill) {
494
495 thread_object.lock();
496
497 switch ( thread_object.get_state(false) ) {
498
499 case T::STATE_INIT:
500 thread_object.unlock();
501 DLog("Threads", "Thread "
502 << thread_param.name << " has not been started yet.");
503 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
504 break;
505
506 case T::STATE_ABORT:
507 if ( ! kill ) {
508 //thread_object.unlock();
509 DLog("Threads", "Thread " << thread_param.name
510 << " is already in state abort.");
511
512 //throw ThreadError(ThreadError::ERROR_ABORTING);
513 }
514 break;
515
516 default:
517 break;
518 }
519
520 if ( thread_object.is_running(false) ) {
521 thread_object.stop_processing(false);
522 // unlock and sleep so the threads have a chance to stop.
523 thread_object.unlock();
524 sleepuntilstop();
525 thread_object.lock();
526 }
527
528 thread_object.abort_processing(false);
529
530 // unlock and let the thread abort
531 thread_object.unlock();
532 sleepuntilstop();
533 thread_object.lock();
534
535 if ( thread_object.is_running(false) ) {
536 // unlock and maybe kill
537 thread_object.unlock();
538 if (kill) {
539 for (unsigned i = 0; i < pthreads.size(); i++)
540 pthread_cancel( pthreads[i] );
541
542 sleepuntilstop();
543
544 for (unsigned i = 0; i < pthreads.size(); i++)
545 pthread_kill(pthreads[i], 9);
546
547 ILog("Threads", pthreads.size() << " thread(s) "
548 << thread_param.name << " killed");
549 } else {
550 ILog("Threads", pthreads.size() << " thread(s) "
551 << thread_param.name << " refused to abort");
552
553 throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
554 }
555
556 } else {
557 thread_object.unlock();
558 ILog("Threads", pthreads.size() << " thread(s) "
559 << thread_param.name << " have terminated");
560 }
561}
562
563
564//@}
565
566} // namespace protlib
567
568#endif // PROTLIB__THREADS_H
Note: See TracBrowser for help on using the repository browser.