source: trash/old-modules/transport/protlib/fastqueue.c@ 12754

Last change on this file since 12754 was 5641, checked in by Christoph Mayer, 15 years ago
File size: 18.0 KB
RevLine 
[5641]1/// ----------------------------------------*- mode: C++; -*--
2/// @file fastqueue.c
3/// a simple FIFO queue with mutexes for use with pthreads
4/// ----------------------------------------------------------
5/// $Id: fastqueue.c 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/fastqueue/fastqueue.c $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/** @addtogroup fastqueue Fast Queue
31 * @{
32 */
33
34/** @file
35 * Fast and thread-safe queue.
36 */
37
38/******************************************************************************
39 * fastqueue.c -- a simple FIFO queue with mutexes for use with pthreads *
40 * -------------------------------------------------------------------------- *
41 * written by Roland Bless 1995 *
42 * all operations enqueue,dequeue are done in O(1) which means constant time *
43 ******************************************************************************/
44
45#define _GNU_SOURCE
46#include <stdio.h>
47#include <string.h>
48#include <pthread.h>
49#include <stdlib.h>
50#include <errno.h>
51#include <sys/time.h>
52#include <unistd.h>
53
54 /**** module interface ****/
55#include "fastqueue.h"
56
57 /*************** defines *****************/
58#define qerr(errnr) fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
59
60#ifdef __linux__
61
62// not needed for Linux
63//#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
64
65#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
66#define PRI_OTHER_MIN PRI_FG_MIN_NP
67#define PRI_OTHER_MAX PRI_FG_MAX_NP
68#define PRI_FG_MIN_NP 8
69#define PRI_FG_MAX_NP 15
70
71#define CLOCK_REALTIME 0
72#define NSEC_PER_SEC 1000000000
73extern int eclock_gettime(struct timespec *tp);
74#define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
75
76#elif _DECTHREADS_VERSION < 314126L
77
78#define pthread_mutexattr_settype pthread_mutexattr_settype_np
79#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_NORMAL_NP
80#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
81#endif
82 /*************** typedefs *****************/
83
84enum {
85 QERR_NONE,
86 QERR_NOMEM,
87 QERR_MUTEXINIT,
88 QERR_MUTEXLOCK,
89 QERR_MUTEXUNLOCK,
90 QERR_MUTEXDESTROY,
91 QERR_QEMPTY,
92 QERR_QINVALID,
93 QERR_QNOTEMPTY,
94 QERR_CONDINIT,
95 QERR_CONDWAIT,
96 QERR_CONDSIGNAL,
97 QERR_CONDDESTROY
98};
99
100const
101char *const queue_errmsg[]=
102{
103 "all ok",
104 "can't get enough memory",
105 "initializing mutex",
106 "locking mutex",
107 "unlocking mutex",
108 "destroying mutex",
109 "queue empty",
110 "invalid queueobject",
111 "destroying queue - queue not empty",
112 "initializing queue condition variable",
113 "waiting on condition",
114 "signalling condition",
115 "destroying condition"
116};
117
118
119
120queue_t *create_queue(const char* name)
121/* initialization routine for a queue.
122 * returns: NULL if an error occured, or a queue-object which is actually
123 * a queueheader structure
124 * arguments: none
125 */
126{
127 queue_t *queuehead;
128
129 /* Allocate memory for queue head */
130 if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
131 {
132 /* Set mutex kind */
133 pthread_mutexattr_init(&queuehead->mutex_attr);
134#ifdef _DEBUG
135 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
136#else
137 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
138#endif
139 /* use PTHREAD_MUTEX_ERRORCHECK or PTHREAD_MUTEX_ERRORCHECK_NP for testing */
140
141 /* Initialize mutex */
142 if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
143 {
144 /* Create Condition variable for command queue */
145 if (pthread_cond_init(&queuehead->cond, NULL)==0)
146 {
147 queuehead->nr_of_elements= 0UL;
148 queuehead->exp_nr_of_elements= 0UL;
149 queuehead->exp_enabled = 0;
150 queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
151 queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
152 if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
153 qerr(QERR_NOMEM);
154 else
155 {
156 queuehead->first_block->read= 0;
157 queuehead->first_block->write= 0;
158 queuehead->first_block->next_block= NULL;
159 queuehead->last_block= queuehead->first_block;
160 queuehead->exp_first_block->read= 0;
161 queuehead->exp_first_block->write= 0;
162 queuehead->exp_first_block->next_block= NULL;
163 queuehead->exp_last_block= queuehead->exp_first_block;
164 if (name)
165 {
166 if (strlen(name) <= MAX_QUEUENAME_LENGTH)
167 strcpy(queuehead->name, name);
168 else
169 {
170 memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
171 queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
172 }
173 }
174 else
175 queuehead->name[0] = '\0';
176 /* Now it's simple to enqueue elements, esp. the first one */
177 }
178//#ifdef QUEUELEN
179 queuehead->queue_maxlength= 0;
180//#endif
181 }
182 else
183 qerr(QERR_CONDINIT);
184 }
185 else /* error during initialize */
186 qerr(QERR_MUTEXINIT);
187 }
188 else
189 qerr(QERR_NOMEM);
190
191 return queuehead;
192}
193
194int enqueue_element_signal(queue_t *queuehead, void *element)
195{
196 return enqueue_element_expedited_signal(queuehead,element,0);
197}
198
199int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
200/* add a new element into the queue. Memory for the element must be
201 * allocated anywhere else. This routine signals other waiting threads.
202 * returns: -1 if an error occured, or 0 is action could be performed
203 * arguments: pointer to queue_t object, pointer to an element
204 */
205{
206 queue_elblock_t *newelement, *lastblockp;
207
208 if (queuehead==NULL)
209 {
210 qerr(QERR_QINVALID);
211 return -1;
212 }
213
214 if (pthread_mutex_lock(&queuehead->mutex)!=0)
215 {
216 qerr(QERR_MUTEXLOCK);
217 return -1;
218 }
219 /* begin critical section */
220
221 if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
222 /* Allocate new element structure when necessary */
223 /* Note: queuehead->last_block must always contain a valid value */
224 lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
225 if (lastblockp->write == ELEMENT_BLOCKSIZE)
226 { /* last block is full, so allocate a new block */
227 if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
228 {
229 qerr(QERR_NOMEM);
230 return -1;
231 }
232
233 /* initialize new structure */
234 newelement->element[0]= element;
235 newelement->read = 0;
236 newelement->write = 1;
237 newelement->next_block= NULL;
238
239 /* append new element to the end */
240 lastblockp->next_block= newelement;
241 /* new element becomes last element */
242 if (exp) queuehead->exp_last_block = newelement;
243 else queuehead->last_block = newelement;
244 }
245 else /* last block was not full */
246 {
247 lastblockp->element[lastblockp->write]= element;
248 lastblockp->write++;
249 }
250
251 if (exp) queuehead->exp_nr_of_elements++;
252 queuehead->nr_of_elements++;
253//#ifdef QUEUELEN
254 if (queuehead->nr_of_elements > queuehead->queue_maxlength)
255 queuehead->queue_maxlength= queuehead->nr_of_elements;
256//#endif
257 /* Condition should be set while mutex is locked.
258 Recommended by libc manual.
259 */
260 if (pthread_cond_signal(&queuehead->cond)!=0)
261 qerr(QERR_CONDSIGNAL);
262 /* end critical section */
263 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
264 {
265 qerr(QERR_MUTEXUNLOCK);
266 return -1;
267 }
268 // see above
269// if (pthread_cond_signal(&queuehead->cond)!=0)
270// qerr(QERR_CONDSIGNAL);
271
272 return 0;
273}
274
275
276void *dequeue_element_wait(queue_t *queuehead)
277/* wait for the queue to contain an element.
278 * if it contains an element return and remove it.
279 * returns: NULL if an error occured, the pointer to the element otherwise
280 * arguments: pointer to queue_t object
281 */
282{
283 void *element;
284 queue_elblock_t *blockp;
285 int exp = 0;
286 element= NULL;
287 int retcode= 0;
288
289 if (queuehead != NULL)
290 {
291 /* Wait for an element in the queue */
292 /* Before waiting on a condition, the associated mutex must be locked */
293 if (pthread_mutex_lock(&queuehead->mutex)!=0)
294 {
295 qerr(QERR_MUTEXLOCK); return NULL;
296 }
297
298 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
299 { /* for a safe state the predicate must be checked in a loop! */
300 /* cond_wait() unlocks the mutex and might return sometimes without
301 getting a signal! */
302 if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
303 {
304 if (retcode!=EINTR && retcode!=ETIMEDOUT)
305 {
306 qerr(QERR_CONDWAIT);
307 }
308 }
309 }
310
311 /* begin critical section */
312 exp = (queuehead->exp_nr_of_elements!=0);
313 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
314 if (blockp != NULL)
315 {
316 /* get the first element */
317 element= blockp->element[blockp->read];
318 blockp->read++;
319
320 if (blockp->next_block == NULL) /* this is the last block */
321 {
322 if (blockp->read == blockp->write)
323 { /* block is completely dequeued, so reset values */
324 /* the last block always remains allocated! */
325 blockp->read= 0;
326 blockp->write= 0;
327 }
328 }
329 else /* this is not the last block */
330 {
331 /* if block was completely dequeued, remove it */
332 if (blockp->read == ELEMENT_BLOCKSIZE)
333 {
334 if (exp) queuehead->exp_first_block= blockp->next_block;
335 else queuehead->first_block= blockp->next_block;
336 free(blockp);
337 }
338 }
339 if (exp) queuehead->exp_nr_of_elements--;
340 queuehead->nr_of_elements--;
341 }
342 else
343 qerr(QERR_QEMPTY);
344
345 /* end critical section */
346 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
347 {
348 qerr(QERR_MUTEXUNLOCK);
349 return NULL;
350 }
351 }
352 else
353 qerr(QERR_QINVALID);
354
355 return element;
356}
357
358void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
359/* wait for the queue to contain an element.
360 * if it contains an element return and remove it.
361 * returns: NULL if an error occured, the pointer to the element otherwise
362 * arguments: pointer to queue_t object
363 * tpsec is the time interval to wait (not an absolute time!)
364 */
365{
366 void *element;
367 queue_elblock_t *blockp;
368 int result;
369 struct timespec abs_tspec;
370 int exp = 0;
371 element= NULL;
372
373 if (queuehead != NULL)
374 {
375 /* Wait for an element in the queue */
376 /* Before waiting on a condition, the associated mutex must be locked */
377 if (pthread_mutex_lock(&queuehead->mutex)!=0)
378 {
379 qerr(QERR_MUTEXLOCK); return NULL;
380 }
381
382 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
383 { /* for a safe state the predicate must be checked in a loop! */
384 /* cond_wait() unlocks the mutex and might return sometimes without
385 getting a signal! */
386 clock_gettime(CLOCK_REALTIME, &abs_tspec);
387 abs_tspec.tv_nsec+= tspec->tv_nsec;
388 abs_tspec.tv_sec+= tspec->tv_sec;
389 if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
390 {
391 abs_tspec.tv_nsec%= NSEC_PER_SEC;
392 abs_tspec.tv_sec++;
393 };
394
395 if ((result = pthread_cond_timedwait(&queuehead->cond,
396 &queuehead->mutex, &abs_tspec))!=0)
397 {
398 if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) )
399 {
400 qerr(QERR_CONDWAIT);
401 }
402 else
403 { /* timeout */
404 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
405 {
406 qerr(QERR_MUTEXUNLOCK);
407 return NULL;
408 }
409 return NULL;
410 }
411 }
412 }
413
414 /* begin critical section */
415 exp = (queuehead->exp_nr_of_elements!=0);
416 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
417 if (blockp != NULL)
418 {
419 /* get the first element */
420 element= blockp->element[blockp->read];
421 blockp->read++;
422
423 if (blockp->next_block == NULL) /* this is the last block */
424 {
425 if (blockp->read == blockp->write)
426 { /* block is completely dequeued, so reset values */
427 /* the last block always remains allocated! */
428 blockp->read= 0;
429 blockp->write= 0;
430 }
431 }
432 else /* this is not the last block */
433 {
434 /* if block was completely dequeued, remove it */
435 if (blockp->read == ELEMENT_BLOCKSIZE)
436 {
437 if (exp) queuehead->exp_first_block= blockp->next_block;
438 else queuehead->first_block= blockp->next_block;
439 free(blockp);
440 }
441 }
442 if (exp) queuehead->exp_nr_of_elements--;
443 queuehead->nr_of_elements--;
444 }
445 else
446 qerr(QERR_QEMPTY);
447
448 /* end critical section */
449 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
450 {
451 qerr(QERR_MUTEXUNLOCK);
452 return NULL;
453 }
454 }
455 else
456 qerr(QERR_QINVALID);
457
458 return element;
459}
460
461int destroy_queue(queue_t *queuehead)
462/* destroys the queue and frees all resources, except the elements!
463 * the queue must be empty to destroy it.
464 * returns: -1 if an error occured, 0 otherwise
465 * arguments: pointer to queue_t object
466 */
467{
468 if (queuehead!=NULL)
469 {
470 /* queue not empty? */
471 if (queuehead->nr_of_elements != 0)
472 qerr(QERR_QNOTEMPTY);
473 else
474 {
475 /* destroy condition variable */
476 if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
477 /* destroy mutex */
478 if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
479
480 pthread_mutexattr_destroy(&queuehead->mutex_attr);
481
482 /* free memory for queuehead */
483#ifdef QUEUELEN
484 fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
485 queuehead->name, queuehead->queue_maxlength);
486#endif
487 free(queuehead->exp_last_block);
488 free(queuehead->last_block);
489 free(queuehead);
490 }
491
492 return 0;
493 }
494 else
495 qerr(QERR_QINVALID);
496
497 return -1;
498}
499
500void *dequeue_element_nonblocking(queue_t *queuehead)
501/* if queue contains an element return and remove it.
502 * returns: NULL if an error occured or queue was empty, the pointer to
503 * the element otherwise.
504 * arguments: pointer to queue_t object
505 */
506{
507 void *element;
508 queue_elblock_t *blockp;
509 int exp = 0;
510 element= NULL;
511
512 if (queuehead != NULL)
513 {
514 if (pthread_mutex_lock(&queuehead->mutex)!=0)
515 {
516 qerr(QERR_MUTEXLOCK); return NULL;
517 }
518
519 /* begin critical section */
520
521 if (queuehead->nr_of_elements==0)
522 {
523 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
524 return NULL;
525 }
526
527 exp = (queuehead->exp_nr_of_elements!=0);
528 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
529 if (blockp != NULL)
530 {
531 /* get the first element */
532 element= blockp->element[blockp->read];
533 blockp->read++;
534
535 if (blockp->next_block == NULL) /* this is the last block */
536 {
537 if (blockp->read == blockp->write)
538 { /* block is completely dequeued, so reset values */
539 /* the last block always remains allocated! */
540 blockp->read= 0;
541 blockp->write= 0;
542 }
543 }
544 else /* this is not the last block */
545 {
546 /* if block was completely dequeued, remove it */
547 if (blockp->read == ELEMENT_BLOCKSIZE)
548 {
549 if (exp) queuehead->exp_first_block= blockp->next_block;
550 else queuehead->first_block= blockp->next_block;
551 free(blockp);
552 }
553 }
554 if (exp) queuehead->exp_nr_of_elements--;
555 queuehead->nr_of_elements--;
556 }
557 else
558 qerr(QERR_QEMPTY);
559
560 /* end critical section */
561 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
562 {
563 qerr(QERR_MUTEXUNLOCK);
564 return NULL;
565 }
566 }
567 else
568 qerr(QERR_QINVALID);
569
570 return element;
571}
572
573unsigned long queue_nr_of_elements(queue_t *queuehead)
574/** Get number fo elements in queue. */
575{
576 unsigned long result = 0;
577
578 if (queuehead != NULL)
579 {
580 if (pthread_mutex_lock(&queuehead->mutex)!=0)
581 {
582 qerr(QERR_MUTEXLOCK);
583 return 0;
584 }
585 /* begin critical section */
586
587 result = queuehead->nr_of_elements;
588
589 /* end critical section */
590 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
591 }
592 else
593 qerr(QERR_QINVALID);
594
595 return result;
596}
597
598int queue_is_expedited_enabled(queue_t *queuehead)
599/** Get exp_enabled flag. */
600{
601 int result = 0;
602
603 if (queuehead != NULL)
604 {
605 if (pthread_mutex_lock(&queuehead->mutex)!=0)
606 {
607 qerr(QERR_MUTEXLOCK);
608 return 0;
609 }
610 /* begin critical section */
611
612 result = queuehead->exp_enabled;
613
614 /* end critical section */
615 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
616 }
617 else
618 qerr(QERR_QINVALID);
619
620 return result;
621}
622
623int queue_enable_expedited(queue_t *queuehead, int exp)
624/** Set exp_enabled flag and return old value. */
625{
626 int result = 0;
627
628 if (queuehead != NULL)
629 {
630 if (pthread_mutex_lock(&queuehead->mutex)!=0)
631 {
632 qerr(QERR_MUTEXLOCK);
633 return 0;
634 }
635 /* begin critical section */
636
637 result = queuehead->exp_enabled;
638 if (exp) queuehead->exp_enabled = 1;
639 else queuehead->exp_enabled = 0;
640
641 /* end critical section */
642 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
643 }
644 else
645 qerr(QERR_QINVALID);
646
647 return result;
648}
649
650//@}
Note: See TracBrowser for help on using the repository browser.