source: trash/old-modules/transport/protlib/threads.cpp@ 10640

Last change on this file since 10640 was 5641, checked in by Christoph Mayer, 15 years ago
File size: 7.1 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file threads.cpp
3/// A Thread class for POSIX threads
4/// ----------------------------------------------------------
5/// $Id: threads.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/threads.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30#include <sys/time.h>
31
32#include "threads.h"
33
34
35namespace protlib {
36 using namespace log;
37
38/** @addtogroup thread Threads
39 * @{
40 */
41
42
43/** This is the default sleep time and can be used as default value in
44 * constructors.
45 */
46uint32 ThreadParam::default_sleep_time = 5;
47
48
49/**
50 * Initializes a ThreadParam object with a default wait time and a
51 * a thread (group) name string.
52 *
53 * @param wait wait time between stopping and aborting the thread inside
54 * ThreadStarter::abort_processing
55 * @param n name of the threads.
56 * @param minc minimal number of threads
57 * @param maxc maximal number of threads
58 */
59ThreadParam::ThreadParam(uint32 wait, const char *n, uint32 minc, uint32 maxc)
60 : sleep_time(wait), name(n ? n : "UNKNOWN"),
61 min_count(minc), max_count(maxc) {
62
63 assert( minc > 0 );
64 assert( maxc >= minc );
65}
66
67
68/**
69 * Constructor.
70 *
71 * @param p thread parameters
72 * @param create_queue if true, create one internal queue
73 * @param exp_allow if true, allow reception of expedited messages on the queue
74 */
75Thread::Thread(const ThreadParam &p, bool create_queue, bool exp_allow)
76 : running_threads(0), started_threads(0), state(STATE_INIT), tparam(p),
77 fq(create_queue ? new FastQueue(p.name.c_str(), exp_allow) : 0) {
78
79 pthread_mutexattr_t mutex_attr;
80
81 pthread_mutexattr_init(&mutex_attr);
82
83#ifdef _DEBUG
84 pthread_mutexattr_settype(&mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
85#else
86 pthread_mutexattr_settype(&mutex_attr,PTHREAD_MUTEX_NORMAL);
87#endif
88
89 pthread_mutex_init(&mutex, &mutex_attr);
90 pthread_cond_init(&cond,NULL);
91
92 pthread_mutexattr_destroy(&mutex_attr);
93}
94
95
96/**
97 * Destructor.
98 *
99 * Currently throws an exception if there are still running threads.
100 */
101Thread::~Thread() {
102 if ( get_running_threads() )
103 throw ThreadError(ThreadError::ERROR_STILL_RUNNING);
104
105 delete fq; // delete queue, no-op if fq is NULL
106
107 pthread_cond_destroy(&cond);
108 pthread_mutex_destroy(&mutex);
109}
110
111
112/**
113 * Called for each thread when processing is started.
114 *
115 * The thread must not be locked because this is done inside this method.
116 * Cancellation is enabled and set to synchronous mode. So you only need to
117 * install cleanup handlers when there is a cancellation point between
118 * calls to lock() and unlock().
119 */
120void *Thread::start_processing() {
121
122 lock();
123
124 switch (state) {
125 case STATE_INIT:
126 state=STATE_RUN;
127 break;
128 case STATE_RUN:
129 break;
130 case STATE_STOP:
131 case STATE_ABORT:
132 unlock();
133 return NULL;
134 }
135
136 inc_running_threads();
137 inc_started_threads();
138
139 int thread_num = get_started_threads();
140
141 unlock();
142
143 /*
144 * Catch exceptions for logging, but don't rethrow them as this would
145 * lead to undefined behaviour (probably crashing the ThreadStarter).
146 *
147 * All exceptions should be handled in main_loop(), it is a programming
148 * error if they are propagated up to this point!
149 */
150 try {
151 main_loop(thread_num);
152 }
153 catch ( ProtLibException &e ) {
154 ERRLog("Threads", "Unhandled ProtLibException in thread "
155 << tparam.name << ", num " << thread_num << ", error ["
156 << e.getstr() << ']');
157 }
158 catch ( bad_alloc & ) {
159 ERRLog("Threads", tparam.name << ", num " << thread_num
160 << ": [out of memory]");
161 }
162 catch ( ... ) {
163 ERRLog("Threads", "Unhandled non-ProtLibException in thread "
164 << tparam.name << ", num " << thread_num);
165 }
166
167 lock();
168 dec_running_threads();
169 unlock();
170
171 return NULL;
172}
173
174
175/**
176 * Called when the thread is asked to stop processing.
177 *
178 * The thread object may do some cleanup or work on until it has completed
179 * a task.
180 *
181 * @param do_lock if true the thread mutex is used
182 */
183void Thread::stop_processing(bool do_lock) {
184 if ( do_lock )
185 lock();
186
187 if (state==STATE_RUN) {
188 state = STATE_STOP;
189 signal_cond();
190 }
191
192 if ( do_lock )
193 unlock();
194}
195
196
197/**
198 * This is called just before a running thread is killed.
199 *
200 * @param do_lock if true the thread mutex is used
201 */
202void Thread::abort_processing(bool do_lock) {
203 if ( do_lock )
204 lock();
205
206 if ( state == STATE_RUN || state == STATE_STOP ) {
207 state = STATE_ABORT;
208 signal_cond();
209 }
210
211 if ( do_lock )
212 unlock();
213}
214
215
216/**
217 * Checks whether there is still a running thread.
218 *
219 * @param do_lock if true the thread mutex is used
220 */
221bool Thread::is_running(bool do_lock) {
222
223 if ( do_lock )
224 lock();
225
226 bool res = ( get_running_threads() > 0 );
227
228 if ( do_lock )
229 unlock();
230
231 return res;
232}
233
234
235/**
236 * Wait for the condition.
237 *
238 * @param sec relative time (seconds)
239 * @param nsec relative time (nanoseconds)
240 * @return 0, ETIMEDOUT or EINTR.
241 */
242int Thread::wait_cond(int32 sec, int32 nsec) {
243 struct timeval tv;
244 struct timespec ts;
245
246 if ( sec < 0 )
247 sec = 0;
248 if ( nsec < 0 )
249 nsec = 0;
250
251 gettimeofday(&tv, NULL);
252 ts.tv_sec = tv.tv_sec+sec;
253 ts.tv_nsec = tv.tv_usec*1000+nsec;
254
255 // TODO: This is weird.
256 while ( ts.tv_nsec > 1000000000) {
257 ts.tv_sec++;
258 ts.tv_nsec -= 1000000000;
259 }
260
261 if ( ts.tv_sec < 0 )
262 ts.tv_sec = 0;
263 if ( ts.tv_nsec < 0 )
264 ts.tv_nsec = 0;
265
266 return pthread_cond_timedwait(&cond, &mutex, &ts);
267}
268
269
270/**
271 * Returns the thread's state.
272 *
273 * @param do_lock if true the thread mutex is used
274 * @return the thread's current state
275 *
276 * @see enum state_t for more information on what a thread state is.
277 */
278Thread::state_t Thread::get_state(bool do_lock) {
279 if ( do_lock )
280 lock();
281
282 state_t s = state;
283
284 if ( do_lock )
285 unlock();
286
287 return s;
288}
289
290/// get time of day as timespec
291void Thread::get_time_of_day(struct timespec& ts) {
292 struct timeval tv;
293 gettimeofday(&tv,NULL);
294 ts.tv_sec = tv.tv_sec;
295 ts.tv_nsec = tv.tv_usec*1000;
296}
297
298const char* ThreadError::getstr() const {
299 return errstr[(int)err];
300}
301
302const char* const ThreadError::errstr[] = {
303 "Cannot create POSIX Threads.",
304 "Thread is running.",
305 "Thread is going to stop.",
306 "Thread is aborting.",
307 "Still running threads left.",
308 "ThreadStarter is not initialized correctly."
309 "Internal ThreadStarter or Thread error.",
310 "Thread has not been started yet."
311};
312
313//@}
314
315} // end namespace protlib
Note: See TracBrowser for help on using the repository browser.