source: source/ariba/utility/transport/tcpip/protlib/threads.h@ 10006

Last change on this file since 10006 was 9991, checked in by Christoph Mayer, 13 years ago

-fixes on protlib for android

File size: 13.8 KB
RevLine 
[5284]1/// ----------------------------------------*- mode: C++; -*--
2/// @file threads.h
3/// Thread support functions (classes thread and threadstarter) based on POSIX threads
4/// ----------------------------------------------------------
5/// $Id: threads.h 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/include/threads.h $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/**
31 * Classes to support multi-threaded programming.
32 *
[9686]33 * @ingroup protlib
[5284]34 *
35 * A Thread module class must inherit from Thread. Several instances may run
36 * simultaneously but they share exactly one module object. So you must take
37 * care of this fact when writing the module code and use locks accordingly.
38 *
39 * Use lock(), unlock(), wait_cond() and signal_cond() the way you would use
40 * the corresponding POSIX thread functions.
41 *
42 * Use the ThreadStarter template class to create threads.
43 */
44
45#ifndef PROTLIB__THREADS_H
46#define PROTLIB__THREADS_H
47
[6922]48#include <vector>
[5284]49#include <pthread.h>
50#include <signal.h>
51#include <sys/times.h>
52#include <string>
53
54#include "protlib_types.h"
55#include "logfile.h"
56#include "fqueue.h"
57
58namespace protlib {
59 using namespace log;
60
[9686]61/** @addtogroup protlib
[5284]62 * @{
63 */
64
65
66/**
67 * Call the method start_processing of a Thread instance.
68 *
69 * @param thread_object a Thread instance
70 */
71template <class T> void *thread_starter(void *thread_object) {
72
[9991]73#ifdef PTHREAD_CANCEL_ENABLE
[5284]74 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
[9991]75#endif
76#ifdef PTHREAD_CANCEL_DEFERRED
[5284]77 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
[9991]78#endif
[5284]79
80 (static_cast<T*>(thread_object))->start_processing();
81 return NULL;
82}
83
84
85/**
86 * Base class for thread object parameters.
87 *
88 * This is used by ThreadStarter to extract and store overall data like the
89 * sleep time and is also accessible to the thread object.
90 */
91class ThreadParam {
92 public:
93 ThreadParam();
94 ThreadParam(uint32 wait, const char* name,
95 uint32 minc=1, uint32 maxc=(uint32)-1);
96
97 static uint32 default_sleep_time;
98 /// sleep time
99 const uint32 sleep_time;
100 const std::string name;
101 /// minimum thread count
102 const uint32 min_count;
103 /// maximum thread count
104 const uint32 max_count;
105};
106
107
108/**
109 * This exception will be thrown if there is some trouble with threading.
110 */
111class ThreadError : public ProtLibException {
112 public:
113 enum error_t {
114 ERROR_THREAD_CREATION, ERROR_RUNNING, ERROR_STOPPING,
115 ERROR_ABORTING, ERROR_STILL_RUNNING, ERROR_UNINITIALIZED,
116 ERROR_INTERNAL, ERROR_NOT_STARTED
117 };
118
119 ThreadError(error_t e) : err(e) { }
120 virtual ~ThreadError() throw () { }
121
122 virtual const char* getstr() const;
123 virtual const char *what() const throw() { return getstr(); }
124 const error_t err;
125
126 protected:
127 static const char* const errstr[];
128};
129
130
131/**
132 * Abstract interface for thread modules.
133 *
134 * Don't confuse this Thread class with POSIX threads. A Thread class only
135 * provides a main_loop method which will be executed by one or more POSIX
136 * threads simultaneously. The Thread instance provides a central point for
137 * all those POSIX threads to store data. Don't forget to lock() the Thread
138 * instance to avoid race conditions if you want to access and/or modify
139 * the data.
140 */
141class Thread {
142 public:
143 Thread(const ThreadParam& p,
144 bool create_queue=true, bool exp_allow=true);
145 virtual ~Thread();
146
147 void *start_processing();
148 void stop_processing(bool do_lock=true);
149 void abort_processing(bool do_lock=true);
150
151 bool is_running(bool do_lock=true);
152
153 virtual void main_loop(uint32 thread_num) = 0;
154
155 void lock();
156 void unlock();
157
158 void signal_cond();
159 void broadcast_cond();
160 void wait_cond();
161 int wait_cond(const struct timespec& ts);
162 int wait_cond(int32 sec, int32 nsec=0);
163
164 /**
165 * State of a thread.
166 *
167 * The state of a thread does not really tell whether there are threads
168 * active or not. It only represents a state in the life cycle of a
169 * thread object.
170 */
171 enum state_t {
172 STATE_INIT, STATE_RUN, STATE_STOP, STATE_ABORT
173 };
174
175 state_t get_state(bool do_lock=true);
176 FastQueue* get_fqueue() { return fq; }
177
178 static void get_time_of_day(struct timespec& ts);
179
180 private:
181 /// This counter records the number of threads running on this object.
182 uint32 running_threads;
183
184 /// This counter records the number of started threads.
185 uint32 started_threads;
186
187 /**
188 * Thread-global mutex.
189 *
190 * This mutex is used to lock the thread object when data common to all
191 * threads on this object is modified.
192 */
193 pthread_mutex_t mutex;
194
195 /// thread object condition
196 pthread_cond_t cond;
197
198 /// thread state
199 state_t state;
200
201 /// Thread parameters.
202 const ThreadParam tparam;
203
204 /// The input queue where threads can get messages from.
205 FastQueue* fq;
206
207 void inc_running_threads();
208 void dec_running_threads();
209 uint32 get_running_threads() const;
210 void inc_started_threads();
211 uint32 get_started_threads() const;
212};
213
214
215inline void Thread::lock() {
216 if ( pthread_mutex_lock(&mutex) != 0 ) {
217 ERRLog(tparam.name, "Error while locking mutex");
218 }
219}
220
221inline void Thread::unlock() {
222 int ret = pthread_mutex_unlock(&mutex);
223 assert( ret == 0 );
224}
225
226inline void Thread::signal_cond() {
227 pthread_cond_signal(&cond);
228}
229
230inline void Thread::broadcast_cond() {
231 pthread_cond_broadcast(&cond);
232}
233
234inline void Thread::wait_cond() {
235 pthread_cond_wait(&cond,&mutex);
236}
237
238
239/**
240 * @param ts absolute time
241 * @return 0, ETIMEDOUT or EINTR.
242 */
243inline int Thread::wait_cond(const struct timespec& ts) {
244 return pthread_cond_timedwait(&cond, &mutex, &ts);
245}
246
247
248inline void Thread::inc_running_threads() {
249 running_threads++;
250}
251
252inline void Thread::dec_running_threads() {
253 assert( running_threads > 0 );
254 running_threads--;
255}
256
257inline uint32 Thread::get_running_threads() const {
258 return running_threads;
259}
260
261inline void Thread::inc_started_threads() {
262 started_threads++;
263}
264
265inline uint32 Thread::get_started_threads() const {
266 return started_threads;
267}
268
269
270/**
271 * A template class used to start threads.
272 *
273 * Note that the ThreadStarter template class is not thread-safe yet, so it
274 * may only be accessed by one thread at a time.
275 */
276template <class T, class TParam> class ThreadStarter {
277 public:
278 ThreadStarter(uint32 count, const TParam& param);
279 ~ThreadStarter();
280
281 void start_processing();
282 void stop_processing();
283 bool sleepuntilstop(); // deprecated!
284 void wait_until_stopped();
285 void abort_processing(bool kill=false);
286
287 /// get a pointer to the thread object
288 inline T *get_thread_object() { return &thread_object; }
289
290 /// Are all threads finished: TODO
291 inline bool is_running() const { return thread_object.is_running(); }
292
293 private:
294 /// The Thread object on which the threads run.
295 T thread_object;
296
297 /// For debugging, the name of the thread as given by TParam.
298 const TParam thread_param;
299
300 /// Contains the handles of all pthreads that we created.
301 std::vector<pthread_t> pthreads;
302};
303
304
305/**
306 * Constructor.
307 *
308 * @param count the number of threads to start
309 * @param param thread parameters
310 */
311template <class T, class TParam>
312ThreadStarter<T, TParam>::ThreadStarter(uint32 count, const TParam& param)
313 : thread_object(param), thread_param(param), pthreads(count) {
314
315 // TODO: fix all Thread subclasses that use an invalid count!
316 if ( count < param.min_count )
317 count = param.min_count;
318 else if ( count > param.max_count )
319 count = param.max_count;
320
321 assert( count >= param.min_count && count <= param.max_count );
322
323 pthreads.resize(count); // TODO: remove
324}
325
326
327/**
328 * Destructor.
329 *
330 * This cancels all running threads if there are still some.
331 */
332template <class T, class TParam> ThreadStarter<T, TParam>::~ThreadStarter() {
333
334 if ( thread_object.is_running() ) {
335 catch_all(stop_processing());
336 sleepuntilstop();
337 catch_all(abort_processing(true));
338 }
339}
340
341
342/**
343 * Start the threads.
344 */
345template <class T, class TParam>
346void ThreadStarter<T, TParam>::start_processing() {
347
348 thread_object.lock();
349
350 /*
351 * Check if this is a fresh Thread. If it is or was already running,
352 * we have detected a programming error.
353 */
354 if ( thread_object.is_running(false) ) {
355 thread_object.unlock();
356
357 ERRLog("Threads", "start_processing(): " << thread_param.name
358 << " is already running");
359
360 throw ThreadError(ThreadError::ERROR_INTERNAL);
361 }
362
363
364 /*
365 * Create the requested number of threads.
366 */
367 int res;
368 pthread_attr_t attr;
369 pthread_attr_init(&attr);
370 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
371
372 for (unsigned i = 0; i < pthreads.size(); i++) {
373 // Create a posix thread. It will start running immediately.
374 res = pthread_create(&pthreads[i], &attr,
375 &thread_starter<T>, &thread_object);
376
377 if ( res != 0 ) {
378 thread_object.unlock();
379 ERRLog("Threads", "pthread_create() failed starting a "
380 << "thread for " << thread_param.name);
381
382 throw ThreadError(ThreadError::ERROR_THREAD_CREATION);
383 }
384 }
385
386 ILog("Threads", pthreads.size() << " " << thread_param.name
387 << " thread(s) sucessfully created");
388
389 pthread_attr_destroy(&attr); // has no effect on the created threads
390
391 thread_object.unlock();
392}
393
394
395/**
396 * Ask all threads to stop (politely).
397 */
398template <class T, class TParam>
399void ThreadStarter<T, TParam>::stop_processing() {
400
401 thread_object.lock();
402
403 typename T::state_t state = thread_object.get_state(false);
404
405 switch (state) {
406
407 case T::STATE_INIT:
408 thread_object.unlock();
409 DLog("Threads", "Thread " << thread_param.name
410 << " has not been started yet.");
411 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
412 break;
413
414 case T::STATE_RUN:
415 thread_object.stop_processing(false);
416 thread_object.unlock();
417
418 ILog("Threads", "Thread(s) "
419 << thread_param.name << " asked to stop");
420 break;
421
422 case T::STATE_STOP:
423 thread_object.unlock();
424 DLog("Threads", "Thread(s) "
425 << thread_param.name << " is already in state stop.");
426 throw ThreadError(ThreadError::ERROR_STOPPING);
427 break;
428
429 case T::STATE_ABORT:
430 //thread_object.unlock();
431 DLog("Threads", "Thread "
432 << thread_param.name << " is in state abort.");
433 //throw ThreadError(ThreadError::ERROR_ABORTING);
434 break;
435
436 default:
437 assert( false ); // unknown state
438
439 }
440}
441
442
443/**
444 * Wait for the thread to stop running (DEPRECATED).
445 *
446 * Sleeps until all threads have stopped running but not longer than
447 * sleep_time seconds.
448 *
449 * This method is deprecated because it suffers from a race condition:
450 * If none of the pthreads created in start_processing() has been run yet,
451 * then this method returns immediately. Use wait_until_stopped() instead.
452 *
453 * @return true if the threads have stopped
454 *
455 * @see ThreadParam
456 */
457template <class T, class TParam>
458bool ThreadStarter<T, TParam>::sleepuntilstop() {
459
460 for (uint32 i = 0; thread_object.is_running()
461 && i < thread_param.sleep_time; i++)
462 sleep(1);
463
464 return ( thread_object.is_running() ? false : true );
465}
466
467
468/**
469 * Wait until all threads have stopped running.
470 *
471 * Threads that haven't been running yet (state IDLE) are not considered
472 * as stopped!
473 */
474template <class T, class TParam>
475void ThreadStarter<T, TParam>::wait_until_stopped() {
476
477 DLog("Threads",
478 "Waiting for Thread " << thread_param.name << " to stop");
479
480 Thread::state_t state = thread_object.get_state(false);
481
482 while ( state == Thread::STATE_INIT || thread_object.is_running() ) {
483 sleep(1);
484 state = thread_object.get_state(false);
485 }
486
487 DLog("Threads", "Thread " << thread_param.name << " has stopped");
488}
489
490
491/**
492 * Stop and kill the threads.
493 *
494 * @param kill kill the threads if they do not stop.
495 */
496template <class T, class TParam>
497void ThreadStarter<T, TParam>::abort_processing(bool kill) {
498
499 thread_object.lock();
500
501 switch ( thread_object.get_state(false) ) {
502
503 case T::STATE_INIT:
504 thread_object.unlock();
505 DLog("Threads", "Thread "
506 << thread_param.name << " has not been started yet.");
507 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
508 break;
509
510 case T::STATE_ABORT:
511 if ( ! kill ) {
512 //thread_object.unlock();
513 DLog("Threads", "Thread " << thread_param.name
514 << " is already in state abort.");
515
516 //throw ThreadError(ThreadError::ERROR_ABORTING);
517 }
518 break;
519
520 default:
521 break;
522 }
523
524 if ( thread_object.is_running(false) ) {
525 thread_object.stop_processing(false);
526 // unlock and sleep so the threads have a chance to stop.
527 thread_object.unlock();
528 sleepuntilstop();
529 thread_object.lock();
530 }
531
532 thread_object.abort_processing(false);
533
534 // unlock and let the thread abort
535 thread_object.unlock();
536 sleepuntilstop();
537 thread_object.lock();
538
539 if ( thread_object.is_running(false) ) {
540 // unlock and maybe kill
541 thread_object.unlock();
542 if (kill) {
[9991]543
544#ifdef PTHREAD_CANCEL_ENABLE
[5284]545 for (unsigned i = 0; i < pthreads.size(); i++)
546 pthread_cancel( pthreads[i] );
547
548 sleepuntilstop();
[9991]549#endif
[5284]550
551 for (unsigned i = 0; i < pthreads.size(); i++)
552 pthread_kill(pthreads[i], 9);
553
554 ILog("Threads", pthreads.size() << " thread(s) "
555 << thread_param.name << " killed");
556 } else {
557 ILog("Threads", pthreads.size() << " thread(s) "
558 << thread_param.name << " refused to abort");
559
560 throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
561 }
562
563 } else {
564 thread_object.unlock();
565 ILog("Threads", pthreads.size() << " thread(s) "
566 << thread_param.name << " have terminated");
567 }
568}
569
570
571//@}
572
573} // namespace protlib
574
575#endif // PROTLIB__THREADS_H
Note: See TracBrowser for help on using the repository browser.