source: source/ariba/utility/transport/tcpip/protlib/fastqueue.c@ 9991

Last change on this file since 9991 was 9991, checked in by Christoph Mayer, 13 years ago

-fixes on protlib for android

File size: 17.6 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file fastqueue.c
3/// a simple FIFO queue with mutexes for use with pthreads
4/// ----------------------------------------------------------
5/// $Id: fastqueue.c 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/fastqueue/fastqueue.c $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/** @addtogroup protlib
31 * @{
32 */
33
34/** @file
35 * Fast and thread-safe queue.
36 */
37
38/******************************************************************************
39 * fastqueue.c -- a simple FIFO queue with mutexes for use with pthreads *
40 * -------------------------------------------------------------------------- *
41 * written by Roland Bless 1995 *
42 * all operations enqueue,dequeue are done in O(1) which means constant time *
43 ******************************************************************************/
44
45#define _GNU_SOURCE
46#include <stdio.h>
47#include <string.h>
48#include <pthread.h>
49#include <stdlib.h>
50#include <errno.h>
51#include <sys/time.h>
52#include <unistd.h>
53
54 /**** module interface ****/
55#include "fastqueue.h"
56
57 /*************** defines *****************/
58#define qerr(errnr) fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
59
60
61#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
62#define PRI_OTHER_MIN PRI_FG_MIN_NP
63#define PRI_OTHER_MAX PRI_FG_MAX_NP
64#define PRI_FG_MIN_NP 8
65#define PRI_FG_MAX_NP 15
66
67#define CLOCK_REALTIME 0
68#define NSEC_PER_SEC 1000000000
69extern int eclock_gettime(struct timespec *tp);
70#define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
71
72 /*************** typedefs *****************/
73
74enum {
75 QERR_NONE,
76 QERR_NOMEM,
77 QERR_MUTEXINIT,
78 QERR_MUTEXLOCK,
79 QERR_MUTEXUNLOCK,
80 QERR_MUTEXDESTROY,
81 QERR_QEMPTY,
82 QERR_QINVALID,
83 QERR_QNOTEMPTY,
84 QERR_CONDINIT,
85 QERR_CONDWAIT,
86 QERR_CONDSIGNAL,
87 QERR_CONDDESTROY
88};
89
90const
91char *const queue_errmsg[]=
92{
93 "all ok",
94 "can't get enough memory",
95 "initializing mutex",
96 "locking mutex",
97 "unlocking mutex",
98 "destroying mutex",
99 "queue empty",
100 "invalid queueobject",
101 "destroying queue - queue not empty",
102 "initializing queue condition variable",
103 "waiting on condition",
104 "signalling condition",
105 "destroying condition"
106};
107
108
109
110queue_t *create_queue(const char* name)
111/* initialization routine for a queue.
112 * returns: NULL if an error occured, or a queue-object which is actually
113 * a queueheader structure
114 * arguments: none
115 */
116{
117 queue_t *queuehead;
118
119 /* Allocate memory for queue head */
120 if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
121 {
122 /* Set mutex kind */
123 pthread_mutexattr_init(&queuehead->mutex_attr);
124#ifdef _DEBUG
125 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
126#else
127 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
128#endif
129 /* use PTHREAD_MUTEX_ERRORCHECK or PTHREAD_MUTEX_ERRORCHECK_NP for testing */
130
131 /* Initialize mutex */
132 if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
133 {
134 /* Create Condition variable for command queue */
135 if (pthread_cond_init(&queuehead->cond, NULL)==0)
136 {
137 queuehead->nr_of_elements= 0UL;
138 queuehead->exp_nr_of_elements= 0UL;
139 queuehead->exp_enabled = 0;
140 queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
141 queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
142 if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
143 qerr(QERR_NOMEM);
144 else
145 {
146 queuehead->first_block->read= 0;
147 queuehead->first_block->write= 0;
148 queuehead->first_block->next_block= NULL;
149 queuehead->last_block= queuehead->first_block;
150 queuehead->exp_first_block->read= 0;
151 queuehead->exp_first_block->write= 0;
152 queuehead->exp_first_block->next_block= NULL;
153 queuehead->exp_last_block= queuehead->exp_first_block;
154 if (name)
155 {
156 if (strlen(name) <= MAX_QUEUENAME_LENGTH)
157 strcpy(queuehead->name, name);
158 else
159 {
160 memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
161 queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
162 }
163 }
164 else
165 queuehead->name[0] = '\0';
166 /* Now it's simple to enqueue elements, esp. the first one */
167 }
168//#ifdef QUEUELEN
169 queuehead->queue_maxlength= 0;
170//#endif
171 }
172 else
173 qerr(QERR_CONDINIT);
174 }
175 else /* error during initialize */
176 qerr(QERR_MUTEXINIT);
177 }
178 else
179 qerr(QERR_NOMEM);
180
181 return queuehead;
182}
183
184int enqueue_element_signal(queue_t *queuehead, void *element)
185{
186 return enqueue_element_expedited_signal(queuehead,element,0);
187}
188
189int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
190/* add a new element into the queue. Memory for the element must be
191 * allocated anywhere else. This routine signals other waiting threads.
192 * returns: -1 if an error occured, or 0 is action could be performed
193 * arguments: pointer to queue_t object, pointer to an element
194 */
195{
196 queue_elblock_t *newelement, *lastblockp;
197
198 if (queuehead==NULL)
199 {
200 qerr(QERR_QINVALID);
201 return -1;
202 }
203
204 if (pthread_mutex_lock(&queuehead->mutex)!=0)
205 {
206 qerr(QERR_MUTEXLOCK);
207 return -1;
208 }
209 /* begin critical section */
210
211 if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
212 /* Allocate new element structure when necessary */
213 /* Note: queuehead->last_block must always contain a valid value */
214 lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
215 if (lastblockp->write == ELEMENT_BLOCKSIZE)
216 { /* last block is full, so allocate a new block */
217 if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
218 {
219 qerr(QERR_NOMEM);
220 return -1;
221 }
222
223 /* initialize new structure */
224 newelement->element[0]= element;
225 newelement->read = 0;
226 newelement->write = 1;
227 newelement->next_block= NULL;
228
229 /* append new element to the end */
230 lastblockp->next_block= newelement;
231 /* new element becomes last element */
232 if (exp) queuehead->exp_last_block = newelement;
233 else queuehead->last_block = newelement;
234 }
235 else /* last block was not full */
236 {
237 lastblockp->element[lastblockp->write]= element;
238 lastblockp->write++;
239 }
240
241 if (exp) queuehead->exp_nr_of_elements++;
242 queuehead->nr_of_elements++;
243//#ifdef QUEUELEN
244 if (queuehead->nr_of_elements > queuehead->queue_maxlength)
245 queuehead->queue_maxlength= queuehead->nr_of_elements;
246//#endif
247 /* Condition should be set while mutex is locked.
248 Recommended by libc manual.
249 */
250 if (pthread_cond_signal(&queuehead->cond)!=0)
251 qerr(QERR_CONDSIGNAL);
252 /* end critical section */
253 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
254 {
255 qerr(QERR_MUTEXUNLOCK);
256 return -1;
257 }
258 // see above
259// if (pthread_cond_signal(&queuehead->cond)!=0)
260// qerr(QERR_CONDSIGNAL);
261
262 return 0;
263}
264
265
266void *dequeue_element_wait(queue_t *queuehead)
267/* wait for the queue to contain an element.
268 * if it contains an element return and remove it.
269 * returns: NULL if an error occured, the pointer to the element otherwise
270 * arguments: pointer to queue_t object
271 */
272{
273 void *element;
274 queue_elblock_t *blockp;
275 int exp = 0;
276 element= NULL;
277 int retcode= 0;
278
279 if (queuehead != NULL)
280 {
281 /* Wait for an element in the queue */
282 /* Before waiting on a condition, the associated mutex must be locked */
283 if (pthread_mutex_lock(&queuehead->mutex)!=0)
284 {
285 qerr(QERR_MUTEXLOCK); return NULL;
286 }
287
288 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
289 { /* for a safe state the predicate must be checked in a loop! */
290 /* cond_wait() unlocks the mutex and might return sometimes without
291 getting a signal! */
292 if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
293 {
294 if (retcode!=EINTR && retcode!=ETIMEDOUT)
295 {
296 qerr(QERR_CONDWAIT);
297 }
298 }
299 }
300
301 /* begin critical section */
302 exp = (queuehead->exp_nr_of_elements!=0);
303 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
304 if (blockp != NULL)
305 {
306 /* get the first element */
307 element= blockp->element[blockp->read];
308 blockp->read++;
309
310 if (blockp->next_block == NULL) /* this is the last block */
311 {
312 if (blockp->read == blockp->write)
313 { /* block is completely dequeued, so reset values */
314 /* the last block always remains allocated! */
315 blockp->read= 0;
316 blockp->write= 0;
317 }
318 }
319 else /* this is not the last block */
320 {
321 /* if block was completely dequeued, remove it */
322 if (blockp->read == ELEMENT_BLOCKSIZE)
323 {
324 if (exp) queuehead->exp_first_block= blockp->next_block;
325 else queuehead->first_block= blockp->next_block;
326 free(blockp);
327 }
328 }
329 if (exp) queuehead->exp_nr_of_elements--;
330 queuehead->nr_of_elements--;
331 }
332 else
333 qerr(QERR_QEMPTY);
334
335 /* end critical section */
336 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
337 {
338 qerr(QERR_MUTEXUNLOCK);
339 return NULL;
340 }
341 }
342 else
343 qerr(QERR_QINVALID);
344
345 return element;
346}
347
348void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
349/* wait for the queue to contain an element.
350 * if it contains an element return and remove it.
351 * returns: NULL if an error occured, the pointer to the element otherwise
352 * arguments: pointer to queue_t object
353 * tpsec is the time interval to wait (not an absolute time!)
354 */
355{
356 void *element;
357 queue_elblock_t *blockp;
358 int result;
359 struct timespec abs_tspec;
360 int exp = 0;
361 element= NULL;
362
363 if (queuehead != NULL)
364 {
365 /* Wait for an element in the queue */
366 /* Before waiting on a condition, the associated mutex must be locked */
367 if (pthread_mutex_lock(&queuehead->mutex)!=0)
368 {
369 qerr(QERR_MUTEXLOCK); return NULL;
370 }
371
372 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
373 { /* for a safe state the predicate must be checked in a loop! */
374 /* cond_wait() unlocks the mutex and might return sometimes without
375 getting a signal! */
376 clock_gettime(CLOCK_REALTIME, &abs_tspec);
377 abs_tspec.tv_nsec+= tspec->tv_nsec;
378 abs_tspec.tv_sec+= tspec->tv_sec;
379 if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
380 {
381 abs_tspec.tv_nsec%= NSEC_PER_SEC;
382 abs_tspec.tv_sec++;
383 };
384
385 if ((result = pthread_cond_timedwait(&queuehead->cond,
386 &queuehead->mutex, &abs_tspec))!=0)
387 {
388 if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) )
389 {
390 qerr(QERR_CONDWAIT);
391 }
392 else
393 { /* timeout */
394 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
395 {
396 qerr(QERR_MUTEXUNLOCK);
397 return NULL;
398 }
399 return NULL;
400 }
401 }
402 }
403
404 /* begin critical section */
405 exp = (queuehead->exp_nr_of_elements!=0);
406 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
407 if (blockp != NULL)
408 {
409 /* get the first element */
410 element= blockp->element[blockp->read];
411 blockp->read++;
412
413 if (blockp->next_block == NULL) /* this is the last block */
414 {
415 if (blockp->read == blockp->write)
416 { /* block is completely dequeued, so reset values */
417 /* the last block always remains allocated! */
418 blockp->read= 0;
419 blockp->write= 0;
420 }
421 }
422 else /* this is not the last block */
423 {
424 /* if block was completely dequeued, remove it */
425 if (blockp->read == ELEMENT_BLOCKSIZE)
426 {
427 if (exp) queuehead->exp_first_block= blockp->next_block;
428 else queuehead->first_block= blockp->next_block;
429 free(blockp);
430 }
431 }
432 if (exp) queuehead->exp_nr_of_elements--;
433 queuehead->nr_of_elements--;
434 }
435 else
436 qerr(QERR_QEMPTY);
437
438 /* end critical section */
439 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
440 {
441 qerr(QERR_MUTEXUNLOCK);
442 return NULL;
443 }
444 }
445 else
446 qerr(QERR_QINVALID);
447
448 return element;
449}
450
451int destroy_queue(queue_t *queuehead)
452/* destroys the queue and frees all resources, except the elements!
453 * the queue must be empty to destroy it.
454 * returns: -1 if an error occured, 0 otherwise
455 * arguments: pointer to queue_t object
456 */
457{
458 if (queuehead!=NULL)
459 {
460 /* queue not empty? */
461 if (queuehead->nr_of_elements != 0)
462 qerr(QERR_QNOTEMPTY);
463 else
464 {
465 /* destroy condition variable */
466 if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
467 /* destroy mutex */
468 if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
469
470 pthread_mutexattr_destroy(&queuehead->mutex_attr);
471
472 /* free memory for queuehead */
473#ifdef QUEUELEN
474 fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
475 queuehead->name, queuehead->queue_maxlength);
476#endif
477 free(queuehead->exp_last_block);
478 free(queuehead->last_block);
479 free(queuehead);
480 }
481
482 return 0;
483 }
484 else
485 qerr(QERR_QINVALID);
486
487 return -1;
488}
489
490void *dequeue_element_nonblocking(queue_t *queuehead)
491/* if queue contains an element return and remove it.
492 * returns: NULL if an error occured or queue was empty, the pointer to
493 * the element otherwise.
494 * arguments: pointer to queue_t object
495 */
496{
497 void *element;
498 queue_elblock_t *blockp;
499 int exp = 0;
500 element= NULL;
501
502 if (queuehead != NULL)
503 {
504 if (pthread_mutex_lock(&queuehead->mutex)!=0)
505 {
506 qerr(QERR_MUTEXLOCK); return NULL;
507 }
508
509 /* begin critical section */
510
511 if (queuehead->nr_of_elements==0)
512 {
513 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
514 return NULL;
515 }
516
517 exp = (queuehead->exp_nr_of_elements!=0);
518 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
519 if (blockp != NULL)
520 {
521 /* get the first element */
522 element= blockp->element[blockp->read];
523 blockp->read++;
524
525 if (blockp->next_block == NULL) /* this is the last block */
526 {
527 if (blockp->read == blockp->write)
528 { /* block is completely dequeued, so reset values */
529 /* the last block always remains allocated! */
530 blockp->read= 0;
531 blockp->write= 0;
532 }
533 }
534 else /* this is not the last block */
535 {
536 /* if block was completely dequeued, remove it */
537 if (blockp->read == ELEMENT_BLOCKSIZE)
538 {
539 if (exp) queuehead->exp_first_block= blockp->next_block;
540 else queuehead->first_block= blockp->next_block;
541 free(blockp);
542 }
543 }
544 if (exp) queuehead->exp_nr_of_elements--;
545 queuehead->nr_of_elements--;
546 }
547 else
548 qerr(QERR_QEMPTY);
549
550 /* end critical section */
551 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
552 {
553 qerr(QERR_MUTEXUNLOCK);
554 return NULL;
555 }
556 }
557 else
558 qerr(QERR_QINVALID);
559
560 return element;
561}
562
563unsigned long queue_nr_of_elements(queue_t *queuehead)
564/** Get number fo elements in queue. */
565{
566 unsigned long result = 0;
567
568 if (queuehead != NULL)
569 {
570 if (pthread_mutex_lock(&queuehead->mutex)!=0)
571 {
572 qerr(QERR_MUTEXLOCK);
573 return 0;
574 }
575 /* begin critical section */
576
577 result = queuehead->nr_of_elements;
578
579 /* end critical section */
580 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
581 }
582 else
583 qerr(QERR_QINVALID);
584
585 return result;
586}
587
588int queue_is_expedited_enabled(queue_t *queuehead)
589/** Get exp_enabled flag. */
590{
591 int result = 0;
592
593 if (queuehead != NULL)
594 {
595 if (pthread_mutex_lock(&queuehead->mutex)!=0)
596 {
597 qerr(QERR_MUTEXLOCK);
598 return 0;
599 }
600 /* begin critical section */
601
602 result = queuehead->exp_enabled;
603
604 /* end critical section */
605 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
606 }
607 else
608 qerr(QERR_QINVALID);
609
610 return result;
611}
612
613int queue_enable_expedited(queue_t *queuehead, int exp)
614/** Set exp_enabled flag and return old value. */
615{
616 int result = 0;
617
618 if (queuehead != NULL)
619 {
620 if (pthread_mutex_lock(&queuehead->mutex)!=0)
621 {
622 qerr(QERR_MUTEXLOCK);
623 return 0;
624 }
625 /* begin critical section */
626
627 result = queuehead->exp_enabled;
628 if (exp) queuehead->exp_enabled = 1;
629 else queuehead->exp_enabled = 0;
630
631 /* end critical section */
632 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
633 }
634 else
635 qerr(QERR_QINVALID);
636
637 return result;
638}
639
640//@}
Note: See TracBrowser for help on using the repository browser.