close Warning: Can't use blame annotator:
No changeset 1891 in the repository

source: source/ariba/communication/modules/transport/protlib/threads.h@ 5638

Last change on this file since 5638 was 5638, checked in by Christoph Mayer, 15 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 13.7 KB
RevLine 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file threads.h
3/// Thread support functions (classes thread and threadstarter) based on POSIX threads
4/// ----------------------------------------------------------
5/// $Id: threads.h 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/include/threads.h $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/**
31 * Classes to support multi-threaded programming.
32 *
33 * @ingroup thread
34 *
35 * A Thread module class must inherit from Thread. Several instances may run
36 * simultaneously but they share exactly one module object. So you must take
37 * care of this fact when writing the module code and use locks accordingly.
38 *
39 * Use lock(), unlock(), wait_cond() and signal_cond() the way you would use
40 * the corresponding POSIX thread functions.
41 *
42 * Use the ThreadStarter template class to create threads.
43 */
44
45#ifndef PROTLIB__THREADS_H
46#define PROTLIB__THREADS_H
47
48#include <pthread.h>
49#include <signal.h>
50#include <sys/times.h>
51#include <string>
52
53#include "protlib_types.h"
54#include "logfile.h"
55#include "fqueue.h"
56
57namespace protlib {
58 using namespace log;
59
60/** @addtogroup thread Threads
61 * @{
62 */
63
64
65/**
66 * Call the method start_processing of a Thread instance.
67 *
68 * @param thread_object a Thread instance
69 */
70template <class T> void *thread_starter(void *thread_object) {
71
72 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
73 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
74
75 (static_cast<T*>(thread_object))->start_processing();
76 return NULL;
77}
78
79
80/**
81 * Base class for thread object parameters.
82 *
83 * This is used by ThreadStarter to extract and store overall data like the
84 * sleep time and is also accessible to the thread object.
85 */
86class ThreadParam {
87 public:
88 ThreadParam();
89 ThreadParam(uint32 wait, const char* name,
90 uint32 minc=1, uint32 maxc=(uint32)-1);
91
92 static uint32 default_sleep_time;
93 /// sleep time
94 const uint32 sleep_time;
95 const std::string name;
96 /// minimum thread count
97 const uint32 min_count;
98 /// maximum thread count
99 const uint32 max_count;
100};
101
102
103/**
104 * This exception will be thrown if there is some trouble with threading.
105 */
106class ThreadError : public ProtLibException {
107 public:
108 enum error_t {
109 ERROR_THREAD_CREATION, ERROR_RUNNING, ERROR_STOPPING,
110 ERROR_ABORTING, ERROR_STILL_RUNNING, ERROR_UNINITIALIZED,
111 ERROR_INTERNAL, ERROR_NOT_STARTED
112 };
113
114 ThreadError(error_t e) : err(e) { }
115 virtual ~ThreadError() throw () { }
116
117 virtual const char* getstr() const;
118 virtual const char *what() const throw() { return getstr(); }
119 const error_t err;
120
121 protected:
122 static const char* const errstr[];
123};
124
125
126/**
127 * Abstract interface for thread modules.
128 *
129 * Don't confuse this Thread class with POSIX threads. A Thread class only
130 * provides a main_loop method which will be executed by one or more POSIX
131 * threads simultaneously. The Thread instance provides a central point for
132 * all those POSIX threads to store data. Don't forget to lock() the Thread
133 * instance to avoid race conditions if you want to access and/or modify
134 * the data.
135 */
136class Thread {
137 public:
138 Thread(const ThreadParam& p,
139 bool create_queue=true, bool exp_allow=true);
140 virtual ~Thread();
141
142 void *start_processing();
143 void stop_processing(bool do_lock=true);
144 void abort_processing(bool do_lock=true);
145
146 bool is_running(bool do_lock=true);
147
148 virtual void main_loop(uint32 thread_num) = 0;
149
150 void lock();
151 void unlock();
152
153 void signal_cond();
154 void broadcast_cond();
155 void wait_cond();
156 int wait_cond(const struct timespec& ts);
157 int wait_cond(int32 sec, int32 nsec=0);
158
159 /**
160 * State of a thread.
161 *
162 * The state of a thread does not really tell whether there are threads
163 * active or not. It only represents a state in the life cycle of a
164 * thread object.
165 */
166 enum state_t {
167 STATE_INIT, STATE_RUN, STATE_STOP, STATE_ABORT
168 };
169
170 state_t get_state(bool do_lock=true);
171 FastQueue* get_fqueue() { return fq; }
172
173 static void get_time_of_day(struct timespec& ts);
174
175 private:
176 /// This counter records the number of threads running on this object.
177 uint32 running_threads;
178
179 /// This counter records the number of started threads.
180 uint32 started_threads;
181
182 /**
183 * Thread-global mutex.
184 *
185 * This mutex is used to lock the thread object when data common to all
186 * threads on this object is modified.
187 */
188 pthread_mutex_t mutex;
189
190 /// thread object condition
191 pthread_cond_t cond;
192
193 /// thread state
194 state_t state;
195
196 /// Thread parameters.
197 const ThreadParam tparam;
198
199 /// The input queue where threads can get messages from.
200 FastQueue* fq;
201
202 void inc_running_threads();
203 void dec_running_threads();
204 uint32 get_running_threads() const;
205 void inc_started_threads();
206 uint32 get_started_threads() const;
207};
208
209
210inline void Thread::lock() {
211 if ( pthread_mutex_lock(&mutex) != 0 ) {
212 ERRLog(tparam.name, "Error while locking mutex");
213 }
214}
215
216inline void Thread::unlock() {
217 int ret = pthread_mutex_unlock(&mutex);
218 assert( ret == 0 );
219}
220
221inline void Thread::signal_cond() {
222 pthread_cond_signal(&cond);
223}
224
225inline void Thread::broadcast_cond() {
226 pthread_cond_broadcast(&cond);
227}
228
229inline void Thread::wait_cond() {
230 pthread_cond_wait(&cond,&mutex);
231}
232
233
234/**
235 * @param ts absolute time
236 * @return 0, ETIMEDOUT or EINTR.
237 */
238inline int Thread::wait_cond(const struct timespec& ts) {
239 return pthread_cond_timedwait(&cond, &mutex, &ts);
240}
241
242
243inline void Thread::inc_running_threads() {
244 running_threads++;
245}
246
247inline void Thread::dec_running_threads() {
248 assert( running_threads > 0 );
249 running_threads--;
250}
251
252inline uint32 Thread::get_running_threads() const {
253 return running_threads;
254}
255
256inline void Thread::inc_started_threads() {
257 started_threads++;
258}
259
260inline uint32 Thread::get_started_threads() const {
261 return started_threads;
262}
263
264
265/**
266 * A template class used to start threads.
267 *
268 * Note that the ThreadStarter template class is not thread-safe yet, so it
269 * may only be accessed by one thread at a time.
270 */
271template <class T, class TParam> class ThreadStarter {
272 public:
273 ThreadStarter(uint32 count, const TParam& param);
274 ~ThreadStarter();
275
276 void start_processing();
277 void stop_processing();
278 bool sleepuntilstop(); // deprecated!
279 void wait_until_stopped();
280 void abort_processing(bool kill=false);
281
282 /// get a pointer to the thread object
283 inline T *get_thread_object() { return &thread_object; }
284
285 /// Are all threads finished: TODO
286 inline bool is_running() const { return thread_object.is_running(); }
287
288 private:
289 /// The Thread object on which the threads run.
290 T thread_object;
291
292 /// For debugging, the name of the thread as given by TParam.
293 const TParam thread_param;
294
295 /// Contains the handles of all pthreads that we created.
296 std::vector<pthread_t> pthreads;
297};
298
299
300/**
301 * Constructor.
302 *
303 * @param count the number of threads to start
304 * @param param thread parameters
305 */
306template <class T, class TParam>
307ThreadStarter<T, TParam>::ThreadStarter(uint32 count, const TParam& param)
308 : thread_object(param), thread_param(param), pthreads(count) {
309
310 // TODO: fix all Thread subclasses that use an invalid count!
311 if ( count < param.min_count )
312 count = param.min_count;
313 else if ( count > param.max_count )
314 count = param.max_count;
315
316 assert( count >= param.min_count && count <= param.max_count );
317
318 pthreads.resize(count); // TODO: remove
319}
320
321
322/**
323 * Destructor.
324 *
325 * This cancels all running threads if there are still some.
326 */
327template <class T, class TParam> ThreadStarter<T, TParam>::~ThreadStarter() {
328
329 if ( thread_object.is_running() ) {
330 catch_all(stop_processing());
331 sleepuntilstop();
332 catch_all(abort_processing(true));
333 }
334}
335
336
337/**
338 * Start the threads.
339 */
340template <class T, class TParam>
341void ThreadStarter<T, TParam>::start_processing() {
342
343 thread_object.lock();
344
345 /*
346 * Check if this is a fresh Thread. If it is or was already running,
347 * we have detected a programming error.
348 */
349 if ( thread_object.is_running(false) ) {
350 thread_object.unlock();
351
352 ERRLog("Threads", "start_processing(): " << thread_param.name
353 << " is already running");
354
355 throw ThreadError(ThreadError::ERROR_INTERNAL);
356 }
357
358
359 /*
360 * Create the requested number of threads.
361 */
362 int res;
363 pthread_attr_t attr;
364 pthread_attr_init(&attr);
365 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
366
367 for (unsigned i = 0; i < pthreads.size(); i++) {
368 // Create a posix thread. It will start running immediately.
369 res = pthread_create(&pthreads[i], &attr,
370 &thread_starter<T>, &thread_object);
371
372 if ( res != 0 ) {
373 thread_object.unlock();
374 ERRLog("Threads", "pthread_create() failed starting a "
375 << "thread for " << thread_param.name);
376
377 throw ThreadError(ThreadError::ERROR_THREAD_CREATION);
378 }
379 }
380
381 ILog("Threads", pthreads.size() << " " << thread_param.name
382 << " thread(s) sucessfully created");
383
384 pthread_attr_destroy(&attr); // has no effect on the created threads
385
386 thread_object.unlock();
387}
388
389
390/**
391 * Ask all threads to stop (politely).
392 */
393template <class T, class TParam>
394void ThreadStarter<T, TParam>::stop_processing() {
395
396 thread_object.lock();
397
398 typename T::state_t state = thread_object.get_state(false);
399
400 switch (state) {
401
402 case T::STATE_INIT:
403 thread_object.unlock();
404 DLog("Threads", "Thread " << thread_param.name
405 << " has not been started yet.");
406 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
407 break;
408
409 case T::STATE_RUN:
410 thread_object.stop_processing(false);
411 thread_object.unlock();
412
413 ILog("Threads", "Thread(s) "
414 << thread_param.name << " asked to stop");
415 break;
416
417 case T::STATE_STOP:
418 thread_object.unlock();
419 DLog("Threads", "Thread(s) "
420 << thread_param.name << " is already in state stop.");
421 throw ThreadError(ThreadError::ERROR_STOPPING);
422 break;
423
424 case T::STATE_ABORT:
425 //thread_object.unlock();
426 DLog("Threads", "Thread "
427 << thread_param.name << " is in state abort.");
428 //throw ThreadError(ThreadError::ERROR_ABORTING);
429 break;
430
431 default:
432 assert( false ); // unknown state
433
434 }
435}
436
437
438/**
439 * Wait for the thread to stop running (DEPRECATED).
440 *
441 * Sleeps until all threads have stopped running but not longer than
442 * sleep_time seconds.
443 *
444 * This method is deprecated because it suffers from a race condition:
445 * If none of the pthreads created in start_processing() has been run yet,
446 * then this method returns immediately. Use wait_until_stopped() instead.
447 *
448 * @return true if the threads have stopped
449 *
450 * @see ThreadParam
451 */
452template <class T, class TParam>
453bool ThreadStarter<T, TParam>::sleepuntilstop() {
454
455 for (uint32 i = 0; thread_object.is_running()
456 && i < thread_param.sleep_time; i++)
457 sleep(1);
458
459 return ( thread_object.is_running() ? false : true );
460}
461
462
463/**
464 * Wait until all threads have stopped running.
465 *
466 * Threads that haven't been running yet (state IDLE) are not considered
467 * as stopped!
468 */
469template <class T, class TParam>
470void ThreadStarter<T, TParam>::wait_until_stopped() {
471
472 DLog("Threads",
473 "Waiting for Thread " << thread_param.name << " to stop");
474
475 Thread::state_t state = thread_object.get_state(false);
476
477 while ( state == Thread::STATE_INIT || thread_object.is_running() ) {
478 sleep(1);
479 state = thread_object.get_state(false);
480 }
481
482 DLog("Threads", "Thread " << thread_param.name << " has stopped");
483}
484
485
486/**
487 * Stop and kill the threads.
488 *
489 * @param kill kill the threads if they do not stop.
490 */
491template <class T, class TParam>
492void ThreadStarter<T, TParam>::abort_processing(bool kill) {
493
494 thread_object.lock();
495
496 switch ( thread_object.get_state(false) ) {
497
498 case T::STATE_INIT:
499 thread_object.unlock();
500 DLog("Threads", "Thread "
501 << thread_param.name << " has not been started yet.");
502 throw ThreadError(ThreadError::ERROR_NOT_STARTED);
503 break;
504
505 case T::STATE_ABORT:
506 if ( ! kill ) {
507 //thread_object.unlock();
508 DLog("Threads", "Thread " << thread_param.name
509 << " is already in state abort.");
510
511 //throw ThreadError(ThreadError::ERROR_ABORTING);
512 }
513 break;
514
515 default:
516 break;
517 }
518
519 if ( thread_object.is_running(false) ) {
520 thread_object.stop_processing(false);
521 // unlock and sleep so the threads have a chance to stop.
522 thread_object.unlock();
523 sleepuntilstop();
524 thread_object.lock();
525 }
526
527 thread_object.abort_processing(false);
528
529 // unlock and let the thread abort
530 thread_object.unlock();
531 sleepuntilstop();
532 thread_object.lock();
533
534 if ( thread_object.is_running(false) ) {
535 // unlock and maybe kill
536 thread_object.unlock();
537 if (kill) {
538 for (unsigned i = 0; i < pthreads.size(); i++)
539 pthread_cancel( pthreads[i] );
540
541 sleepuntilstop();
542
543 for (unsigned i = 0; i < pthreads.size(); i++)
544 pthread_kill(pthreads[i], 9);
545
546 ILog("Threads", pthreads.size() << " thread(s) "
547 << thread_param.name << " killed");
548 } else {
549 ILog("Threads", pthreads.size() << " thread(s) "
550 << thread_param.name << " refused to abort");
551
552 throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
553 }
554
555 } else {
556 thread_object.unlock();
557 ILog("Threads", pthreads.size() << " thread(s) "
558 << thread_param.name << " have terminated");
559 }
560}
561
562
563//@}
564
565} // namespace protlib
566
567#endif // PROTLIB__THREADS_H
Note: See TracBrowser for help on using the repository browser.