An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/protlib/threads.h @ 6922

Last change on this file since 6922 was 6922, checked in by mies, 14 years ago

replaced deprecated hash_map, hash_set classes with boost
added unsigned serialization
fixed more warnings
included -Wall to Makefile.am

File size: 13.7 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file threads.h
3/// Thread support functions (classes thread and threadstarter) based on POSIX threads
4/// ----------------------------------------------------------
5/// $Id: threads.h 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/include/threads.h $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/**
31 * Classes to support multi-threaded programming.
32 *
33 * @ingroup thread
34 *
35 * A Thread module class must inherit from Thread. Several instances may run
36 * simultaneously but they share exactly one module object. So you must take
37 * care of this fact when writing the module code and use locks accordingly.
38 *
39 * Use lock(), unlock(), wait_cond() and signal_cond() the way you would use
40 * the corresponding POSIX thread functions.
41 *
42 * Use the ThreadStarter template class to create threads.
43 */
44
45#ifndef PROTLIB__THREADS_H
46#define PROTLIB__THREADS_H
47
48#include <vector>
49#include <pthread.h>
50#include <signal.h>
51#include <sys/times.h>
52#include <string>
53
54#include "protlib_types.h"
55#include "logfile.h"
56#include "fqueue.h"
57
58namespace protlib {
59  using namespace log;
60
61/** @addtogroup thread Threads
62 * @{
63 */
64
65
66/**
67 * Call the method start_processing of a Thread instance.
68 *
69 * @param thread_object a Thread instance
70 */
71template <class T> void *thread_starter(void *thread_object) {
72
73        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
74        pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
75
76        (static_cast<T*>(thread_object))->start_processing();
77        return NULL;
78}
79
80
81/**
82 * Base class for thread object parameters.
83 *
84 * This is used by ThreadStarter to extract and store overall data like the
85 * sleep time and is also accessible to the thread object.
86 */
87class ThreadParam {
88  public:
89        ThreadParam();
90        ThreadParam(uint32 wait, const char* name,
91                uint32 minc=1, uint32 maxc=(uint32)-1);
92
93        static uint32 default_sleep_time;
94        /// sleep time
95        const uint32 sleep_time;
96        const std::string name;
97        /// minimum thread count
98        const uint32 min_count;
99        /// maximum thread count
100        const uint32 max_count;
101};
102
103
104/**
105 * This exception will be thrown if there is some trouble with threading.
106 */
107class ThreadError : public ProtLibException {
108  public:
109        enum error_t {
110                ERROR_THREAD_CREATION, ERROR_RUNNING, ERROR_STOPPING,
111                ERROR_ABORTING, ERROR_STILL_RUNNING, ERROR_UNINITIALIZED,
112                ERROR_INTERNAL, ERROR_NOT_STARTED
113        };
114
115        ThreadError(error_t e) : err(e) { }
116        virtual ~ThreadError() throw () { }
117
118        virtual const char* getstr() const;
119        virtual const char *what() const throw() { return getstr(); }
120        const error_t err;
121
122  protected:
123        static const char* const errstr[];
124};
125
126
127/**
128 * Abstract interface for thread modules.
129 *
130 * Don't confuse this Thread class with POSIX threads. A Thread class only
131 * provides a main_loop method which will be executed by one or more POSIX
132 * threads simultaneously. The Thread instance provides a central point for
133 * all those POSIX threads to store data. Don't forget to lock() the Thread
134 * instance to avoid race conditions if you want to access and/or modify
135 * the data.
136 */
137class Thread {
138  public:
139        Thread(const ThreadParam& p,
140                bool create_queue=true, bool exp_allow=true);
141        virtual ~Thread();
142
143        void *start_processing();
144        void stop_processing(bool do_lock=true);
145        void abort_processing(bool do_lock=true);
146
147        bool is_running(bool do_lock=true);
148
149        virtual void main_loop(uint32 thread_num) = 0;
150
151        void lock();
152        void unlock();
153
154        void signal_cond();
155        void broadcast_cond();
156        void wait_cond();
157        int wait_cond(const struct timespec& ts);
158        int wait_cond(int32 sec, int32 nsec=0);
159
160        /**
161        * State of a thread.
162        *
163        * The state of a thread does not really tell whether there are threads
164        * active or not. It only represents a state in the life cycle of a
165        * thread object.
166        */
167        enum state_t {
168                STATE_INIT, STATE_RUN, STATE_STOP, STATE_ABORT
169        };
170
171        state_t get_state(bool do_lock=true);
172        FastQueue* get_fqueue() { return fq; }
173
174        static void get_time_of_day(struct timespec& ts);
175
176  private:
177        /// This counter records the number of threads running on this object.
178        uint32 running_threads;
179
180        /// This counter records the number of started threads.
181        uint32 started_threads;
182
183        /**
184        * Thread-global mutex.
185        *
186        * This mutex is used to lock the thread object when data common to all
187        * threads on this object is modified.
188        */
189        pthread_mutex_t mutex;
190
191        /// thread object condition
192        pthread_cond_t cond;
193
194        /// thread state
195        state_t state;
196
197        /// Thread parameters.
198        const ThreadParam tparam;
199
200        /// The input queue where threads can get messages from.
201        FastQueue* fq;
202
203        void inc_running_threads();
204        void dec_running_threads();
205        uint32 get_running_threads() const;
206        void inc_started_threads();
207        uint32 get_started_threads() const;
208};
209   
210
211inline void Thread::lock() {
212        if ( pthread_mutex_lock(&mutex) != 0 ) {
213                ERRLog(tparam.name, "Error while locking mutex");
214        }
215}
216
217inline void Thread::unlock() {
218        int ret = pthread_mutex_unlock(&mutex);
219        assert( ret == 0 );
220}
221
222inline void Thread::signal_cond() {
223        pthread_cond_signal(&cond);
224}
225
226inline void Thread::broadcast_cond() {
227        pthread_cond_broadcast(&cond);
228}
229
230inline void Thread::wait_cond() {
231        pthread_cond_wait(&cond,&mutex);
232}
233
234
235/**
236 * @param ts absolute time
237 * @return 0, ETIMEDOUT or EINTR.
238 */
239inline int Thread::wait_cond(const struct timespec& ts) {
240        return pthread_cond_timedwait(&cond, &mutex, &ts);
241}
242
243
244inline void Thread::inc_running_threads() {
245        running_threads++;
246}
247
248inline void Thread::dec_running_threads() {
249        assert( running_threads > 0 );
250        running_threads--;
251}
252
253inline uint32 Thread::get_running_threads() const {
254        return running_threads;
255}
256
257inline void Thread::inc_started_threads() {
258        started_threads++;
259}
260
261inline uint32 Thread::get_started_threads() const {
262        return started_threads;
263}
264
265
266/**
267 * A template class used to start threads.
268 *
269 * Note that the ThreadStarter template class is not thread-safe yet, so it
270 * may only be accessed by one thread at a time.
271 */
272template <class T, class TParam> class ThreadStarter {
273  public:
274        ThreadStarter(uint32 count, const TParam& param);
275        ~ThreadStarter();
276
277        void start_processing();
278        void stop_processing();
279        bool sleepuntilstop();          // deprecated!
280        void wait_until_stopped();
281        void abort_processing(bool kill=false);
282
283        /// get a pointer to the thread object
284        inline T *get_thread_object() { return &thread_object; }
285
286        /// Are all threads finished: TODO
287        inline bool is_running() const { return thread_object.is_running(); }
288
289  private:
290        /// The Thread object on which the threads run.
291        T thread_object;
292
293        /// For debugging, the name of the thread as given by TParam.
294        const TParam thread_param;
295
296        /// Contains the handles of all pthreads that we created.
297        std::vector<pthread_t> pthreads;
298};
299
300
301/**
302 * Constructor.
303 *
304 * @param count the number of threads to start
305 * @param param thread parameters
306 */
307template <class T, class TParam>
308ThreadStarter<T, TParam>::ThreadStarter(uint32 count, const TParam& param)
309                : thread_object(param), thread_param(param), pthreads(count) {
310
311        // TODO: fix all Thread subclasses that use an invalid count!
312        if ( count < param.min_count )
313                count = param.min_count;
314        else if ( count > param.max_count )
315                count = param.max_count;
316
317        assert( count >= param.min_count && count <= param.max_count );
318
319        pthreads.resize(count); // TODO: remove
320}
321
322
323/**
324 * Destructor.
325 *
326 * This cancels all running threads if there are still some.
327 */
328template <class T, class TParam> ThreadStarter<T, TParam>::~ThreadStarter() {
329
330        if ( thread_object.is_running() ) {
331                catch_all(stop_processing());
332                sleepuntilstop();
333                catch_all(abort_processing(true));
334        }
335}
336
337
338/**
339 * Start the threads.
340 */
341template <class T, class TParam>
342void ThreadStarter<T, TParam>::start_processing() {
343
344        thread_object.lock();
345
346        /*
347         * Check if this is a fresh Thread. If it is or was already running,
348         * we have detected a programming error.
349         */
350        if ( thread_object.is_running(false) ) {
351                thread_object.unlock();
352
353                ERRLog("Threads", "start_processing(): " << thread_param.name
354                        << " is already running");
355
356                throw ThreadError(ThreadError::ERROR_INTERNAL);
357        }
358
359
360        /*
361         * Create the requested number of threads.
362         */
363        int res;
364        pthread_attr_t attr;
365        pthread_attr_init(&attr);
366        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
367
368        for (unsigned i = 0; i < pthreads.size(); i++) {
369                // Create a posix thread. It will start running immediately.
370                res = pthread_create(&pthreads[i], &attr,
371                                        &thread_starter<T>, &thread_object);
372
373                if ( res != 0 ) {
374                        thread_object.unlock();
375                        ERRLog("Threads", "pthread_create() failed starting a "
376                                << "thread for " << thread_param.name);
377
378                        throw ThreadError(ThreadError::ERROR_THREAD_CREATION);
379                }
380        }
381
382        ILog("Threads", pthreads.size() << " " << thread_param.name
383                << " thread(s) sucessfully created");
384
385        pthread_attr_destroy(&attr); // has no effect on the created threads
386
387        thread_object.unlock();
388}
389
390
391/**
392 * Ask all threads to stop (politely).
393 */
394template <class T, class TParam>
395void ThreadStarter<T, TParam>::stop_processing() {
396
397        thread_object.lock();
398
399        typename T::state_t state = thread_object.get_state(false);
400
401        switch (state) {
402
403          case T::STATE_INIT:
404                thread_object.unlock();
405                DLog("Threads", "Thread " << thread_param.name
406                        << " has not been started yet.");
407                throw ThreadError(ThreadError::ERROR_NOT_STARTED);
408                break;
409
410          case T::STATE_RUN:
411                thread_object.stop_processing(false);
412                thread_object.unlock();
413
414                ILog("Threads", "Thread(s) "
415                        << thread_param.name << " asked to stop");
416                break;
417
418          case T::STATE_STOP:
419                thread_object.unlock();
420                DLog("Threads", "Thread(s) "
421                        << thread_param.name << " is already in state stop.");
422                throw ThreadError(ThreadError::ERROR_STOPPING);
423                break;
424
425          case T::STATE_ABORT:
426                //thread_object.unlock();
427                DLog("Threads", "Thread "
428                        << thread_param.name << " is in state abort.");
429                //throw ThreadError(ThreadError::ERROR_ABORTING);
430                break;
431
432          default:
433                assert( false ); // unknown state
434
435        }
436}
437
438
439/**
440 * Wait for the thread to stop running (DEPRECATED).
441 *
442 * Sleeps until all threads have stopped running but not longer than
443 * sleep_time seconds.
444 *
445 * This method is deprecated because it suffers from a race condition:
446 * If none of the pthreads created in start_processing() has been run yet,
447 * then this method returns immediately. Use wait_until_stopped() instead.
448 *
449 * @return true if the threads have stopped
450 *
451 * @see ThreadParam
452 */
453template <class T, class TParam>
454bool ThreadStarter<T, TParam>::sleepuntilstop() {
455       
456        for (uint32 i = 0; thread_object.is_running()
457                        && i < thread_param.sleep_time; i++)
458                sleep(1);
459
460        return ( thread_object.is_running() ? false : true );
461}
462
463
464/**
465 * Wait until all threads have stopped running.
466 *
467 * Threads that haven't been running yet (state IDLE) are not considered
468 * as stopped!
469 */
470template <class T, class TParam>
471void ThreadStarter<T, TParam>::wait_until_stopped() {
472
473        DLog("Threads",
474                "Waiting for Thread " << thread_param.name << " to stop");
475
476        Thread::state_t state = thread_object.get_state(false);
477
478        while ( state == Thread::STATE_INIT || thread_object.is_running() ) {
479                sleep(1);
480                state = thread_object.get_state(false);
481        }
482
483        DLog("Threads", "Thread " << thread_param.name << " has stopped");
484}
485
486
487/**
488 * Stop and kill the threads.
489 *
490 * @param kill kill the threads if they do not stop.
491 */
492template <class T, class TParam>
493void ThreadStarter<T, TParam>::abort_processing(bool kill) {
494
495        thread_object.lock();
496
497        switch ( thread_object.get_state(false) ) {
498
499          case T::STATE_INIT:
500                thread_object.unlock();
501                DLog("Threads", "Thread "
502                        << thread_param.name << " has not been started yet.");
503                throw ThreadError(ThreadError::ERROR_NOT_STARTED);
504                break;
505
506          case T::STATE_ABORT: 
507                if ( ! kill ) {
508                        //thread_object.unlock();
509                        DLog("Threads", "Thread " << thread_param.name
510                                << " is already in state abort.");
511
512                        //throw ThreadError(ThreadError::ERROR_ABORTING);
513                }
514                break;
515
516          default:
517                break;
518        }
519
520        if ( thread_object.is_running(false) ) {
521                thread_object.stop_processing(false); 
522                // unlock and sleep so the threads have a chance to stop.
523                thread_object.unlock();
524                sleepuntilstop();
525                thread_object.lock();
526        }
527
528        thread_object.abort_processing(false);
529
530        // unlock and let the thread abort
531        thread_object.unlock();
532        sleepuntilstop();
533        thread_object.lock();
534
535        if ( thread_object.is_running(false) ) {
536                // unlock and maybe kill
537                thread_object.unlock();
538                if (kill) {
539                        for (unsigned i = 0; i < pthreads.size(); i++) 
540                                pthread_cancel( pthreads[i] );
541
542                        sleepuntilstop();
543
544                        for (unsigned i = 0; i < pthreads.size(); i++) 
545                                pthread_kill(pthreads[i], 9);
546
547                        ILog("Threads", pthreads.size() << " thread(s) "
548                                << thread_param.name << " killed");
549                } else {
550                        ILog("Threads", pthreads.size() << " thread(s) "
551                                << thread_param.name << " refused to abort");
552
553                        throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
554                }
555
556        } else {
557                thread_object.unlock();
558                ILog("Threads", pthreads.size() << " thread(s) "
559                        << thread_param.name << " have terminated");
560        }
561}
562
563
564//@}
565
566} // namespace protlib
567
568#endif // PROTLIB__THREADS_H
Note: See TracBrowser for help on using the repository browser.