source: source/ariba/utility/transport/tcpip/protlib/fastqueue.c@ 10071

Last change on this file since 10071 was 9992, checked in by Christoph Mayer, 13 years ago

-fixes on protlib for android

File size: 17.7 KB
RevLine 
[5284]1/// ----------------------------------------*- mode: C++; -*--
2/// @file fastqueue.c
3/// a simple FIFO queue with mutexes for use with pthreads
4/// ----------------------------------------------------------
5/// $Id: fastqueue.c 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/fastqueue/fastqueue.c $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
[9686]30/** @addtogroup protlib
[5284]31 * @{
32 */
33
34/** @file
35 * Fast and thread-safe queue.
36 */
37
38/******************************************************************************
39 * fastqueue.c -- a simple FIFO queue with mutexes for use with pthreads *
40 * -------------------------------------------------------------------------- *
41 * written by Roland Bless 1995 *
42 * all operations enqueue,dequeue are done in O(1) which means constant time *
43 ******************************************************************************/
44
45#define _GNU_SOURCE
46#include <stdio.h>
47#include <string.h>
48#include <pthread.h>
49#include <stdlib.h>
50#include <errno.h>
51#include <sys/time.h>
52#include <unistd.h>
53
54 /**** module interface ****/
55#include "fastqueue.h"
56
57 /*************** defines *****************/
58#define qerr(errnr) fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
59
[9992]60#ifndef ANDROID
61 #define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
62#endif
[5284]63#define PRI_OTHER_MIN PRI_FG_MIN_NP
64#define PRI_OTHER_MAX PRI_FG_MAX_NP
65#define PRI_FG_MIN_NP 8
66#define PRI_FG_MAX_NP 15
67
68#define CLOCK_REALTIME 0
69#define NSEC_PER_SEC 1000000000
70extern int eclock_gettime(struct timespec *tp);
71#define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
72
73 /*************** typedefs *****************/
74
75enum {
76 QERR_NONE,
77 QERR_NOMEM,
78 QERR_MUTEXINIT,
79 QERR_MUTEXLOCK,
80 QERR_MUTEXUNLOCK,
81 QERR_MUTEXDESTROY,
82 QERR_QEMPTY,
83 QERR_QINVALID,
84 QERR_QNOTEMPTY,
85 QERR_CONDINIT,
86 QERR_CONDWAIT,
87 QERR_CONDSIGNAL,
88 QERR_CONDDESTROY
89};
90
91const
92char *const queue_errmsg[]=
93{
94 "all ok",
95 "can't get enough memory",
96 "initializing mutex",
97 "locking mutex",
98 "unlocking mutex",
99 "destroying mutex",
100 "queue empty",
101 "invalid queueobject",
102 "destroying queue - queue not empty",
103 "initializing queue condition variable",
104 "waiting on condition",
105 "signalling condition",
106 "destroying condition"
107};
108
109
110
111queue_t *create_queue(const char* name)
112/* initialization routine for a queue.
113 * returns: NULL if an error occured, or a queue-object which is actually
114 * a queueheader structure
115 * arguments: none
116 */
117{
118 queue_t *queuehead;
119
120 /* Allocate memory for queue head */
121 if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
122 {
123 /* Set mutex kind */
124 pthread_mutexattr_init(&queuehead->mutex_attr);
125#ifdef _DEBUG
126 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
127#else
128 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
129#endif
130 /* use PTHREAD_MUTEX_ERRORCHECK or PTHREAD_MUTEX_ERRORCHECK_NP for testing */
131
132 /* Initialize mutex */
133 if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
134 {
135 /* Create Condition variable for command queue */
136 if (pthread_cond_init(&queuehead->cond, NULL)==0)
137 {
138 queuehead->nr_of_elements= 0UL;
139 queuehead->exp_nr_of_elements= 0UL;
140 queuehead->exp_enabled = 0;
141 queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
142 queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
143 if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
144 qerr(QERR_NOMEM);
145 else
146 {
147 queuehead->first_block->read= 0;
148 queuehead->first_block->write= 0;
149 queuehead->first_block->next_block= NULL;
150 queuehead->last_block= queuehead->first_block;
151 queuehead->exp_first_block->read= 0;
152 queuehead->exp_first_block->write= 0;
153 queuehead->exp_first_block->next_block= NULL;
154 queuehead->exp_last_block= queuehead->exp_first_block;
155 if (name)
156 {
157 if (strlen(name) <= MAX_QUEUENAME_LENGTH)
158 strcpy(queuehead->name, name);
159 else
160 {
161 memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
162 queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
163 }
164 }
165 else
166 queuehead->name[0] = '\0';
167 /* Now it's simple to enqueue elements, esp. the first one */
168 }
169//#ifdef QUEUELEN
170 queuehead->queue_maxlength= 0;
171//#endif
172 }
173 else
174 qerr(QERR_CONDINIT);
175 }
176 else /* error during initialize */
177 qerr(QERR_MUTEXINIT);
178 }
179 else
180 qerr(QERR_NOMEM);
181
182 return queuehead;
183}
184
185int enqueue_element_signal(queue_t *queuehead, void *element)
186{
187 return enqueue_element_expedited_signal(queuehead,element,0);
188}
189
190int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
191/* add a new element into the queue. Memory for the element must be
192 * allocated anywhere else. This routine signals other waiting threads.
193 * returns: -1 if an error occured, or 0 is action could be performed
194 * arguments: pointer to queue_t object, pointer to an element
195 */
196{
197 queue_elblock_t *newelement, *lastblockp;
198
199 if (queuehead==NULL)
200 {
201 qerr(QERR_QINVALID);
202 return -1;
203 }
204
205 if (pthread_mutex_lock(&queuehead->mutex)!=0)
206 {
207 qerr(QERR_MUTEXLOCK);
208 return -1;
209 }
210 /* begin critical section */
211
212 if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
213 /* Allocate new element structure when necessary */
214 /* Note: queuehead->last_block must always contain a valid value */
215 lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
216 if (lastblockp->write == ELEMENT_BLOCKSIZE)
217 { /* last block is full, so allocate a new block */
218 if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
219 {
220 qerr(QERR_NOMEM);
221 return -1;
222 }
223
224 /* initialize new structure */
225 newelement->element[0]= element;
226 newelement->read = 0;
227 newelement->write = 1;
228 newelement->next_block= NULL;
229
230 /* append new element to the end */
231 lastblockp->next_block= newelement;
232 /* new element becomes last element */
233 if (exp) queuehead->exp_last_block = newelement;
234 else queuehead->last_block = newelement;
235 }
236 else /* last block was not full */
237 {
238 lastblockp->element[lastblockp->write]= element;
239 lastblockp->write++;
240 }
241
242 if (exp) queuehead->exp_nr_of_elements++;
243 queuehead->nr_of_elements++;
244//#ifdef QUEUELEN
245 if (queuehead->nr_of_elements > queuehead->queue_maxlength)
246 queuehead->queue_maxlength= queuehead->nr_of_elements;
247//#endif
248 /* Condition should be set while mutex is locked.
249 Recommended by libc manual.
250 */
251 if (pthread_cond_signal(&queuehead->cond)!=0)
252 qerr(QERR_CONDSIGNAL);
253 /* end critical section */
254 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
255 {
256 qerr(QERR_MUTEXUNLOCK);
257 return -1;
258 }
259 // see above
260// if (pthread_cond_signal(&queuehead->cond)!=0)
261// qerr(QERR_CONDSIGNAL);
262
263 return 0;
264}
265
266
267void *dequeue_element_wait(queue_t *queuehead)
268/* wait for the queue to contain an element.
269 * if it contains an element return and remove it.
270 * returns: NULL if an error occured, the pointer to the element otherwise
271 * arguments: pointer to queue_t object
272 */
273{
274 void *element;
275 queue_elblock_t *blockp;
276 int exp = 0;
277 element= NULL;
278 int retcode= 0;
279
280 if (queuehead != NULL)
281 {
282 /* Wait for an element in the queue */
283 /* Before waiting on a condition, the associated mutex must be locked */
284 if (pthread_mutex_lock(&queuehead->mutex)!=0)
285 {
286 qerr(QERR_MUTEXLOCK); return NULL;
287 }
288
289 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
290 { /* for a safe state the predicate must be checked in a loop! */
291 /* cond_wait() unlocks the mutex and might return sometimes without
292 getting a signal! */
293 if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
294 {
295 if (retcode!=EINTR && retcode!=ETIMEDOUT)
296 {
297 qerr(QERR_CONDWAIT);
298 }
299 }
300 }
301
302 /* begin critical section */
303 exp = (queuehead->exp_nr_of_elements!=0);
304 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
305 if (blockp != NULL)
306 {
307 /* get the first element */
308 element= blockp->element[blockp->read];
309 blockp->read++;
310
311 if (blockp->next_block == NULL) /* this is the last block */
312 {
313 if (blockp->read == blockp->write)
314 { /* block is completely dequeued, so reset values */
315 /* the last block always remains allocated! */
316 blockp->read= 0;
317 blockp->write= 0;
318 }
319 }
320 else /* this is not the last block */
321 {
322 /* if block was completely dequeued, remove it */
323 if (blockp->read == ELEMENT_BLOCKSIZE)
324 {
325 if (exp) queuehead->exp_first_block= blockp->next_block;
326 else queuehead->first_block= blockp->next_block;
327 free(blockp);
328 }
329 }
330 if (exp) queuehead->exp_nr_of_elements--;
331 queuehead->nr_of_elements--;
332 }
333 else
334 qerr(QERR_QEMPTY);
335
336 /* end critical section */
337 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
338 {
339 qerr(QERR_MUTEXUNLOCK);
340 return NULL;
341 }
342 }
343 else
344 qerr(QERR_QINVALID);
345
346 return element;
347}
348
349void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
350/* wait for the queue to contain an element.
351 * if it contains an element return and remove it.
352 * returns: NULL if an error occured, the pointer to the element otherwise
353 * arguments: pointer to queue_t object
354 * tpsec is the time interval to wait (not an absolute time!)
355 */
356{
357 void *element;
358 queue_elblock_t *blockp;
359 int result;
360 struct timespec abs_tspec;
361 int exp = 0;
362 element= NULL;
363
364 if (queuehead != NULL)
365 {
366 /* Wait for an element in the queue */
367 /* Before waiting on a condition, the associated mutex must be locked */
368 if (pthread_mutex_lock(&queuehead->mutex)!=0)
369 {
370 qerr(QERR_MUTEXLOCK); return NULL;
371 }
372
373 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
374 { /* for a safe state the predicate must be checked in a loop! */
375 /* cond_wait() unlocks the mutex and might return sometimes without
376 getting a signal! */
377 clock_gettime(CLOCK_REALTIME, &abs_tspec);
378 abs_tspec.tv_nsec+= tspec->tv_nsec;
379 abs_tspec.tv_sec+= tspec->tv_sec;
380 if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
381 {
382 abs_tspec.tv_nsec%= NSEC_PER_SEC;
383 abs_tspec.tv_sec++;
384 };
385
386 if ((result = pthread_cond_timedwait(&queuehead->cond,
387 &queuehead->mutex, &abs_tspec))!=0)
388 {
389 if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) )
390 {
391 qerr(QERR_CONDWAIT);
392 }
393 else
394 { /* timeout */
395 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
396 {
397 qerr(QERR_MUTEXUNLOCK);
398 return NULL;
399 }
400 return NULL;
401 }
402 }
403 }
404
405 /* begin critical section */
406 exp = (queuehead->exp_nr_of_elements!=0);
407 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
408 if (blockp != NULL)
409 {
410 /* get the first element */
411 element= blockp->element[blockp->read];
412 blockp->read++;
413
414 if (blockp->next_block == NULL) /* this is the last block */
415 {
416 if (blockp->read == blockp->write)
417 { /* block is completely dequeued, so reset values */
418 /* the last block always remains allocated! */
419 blockp->read= 0;
420 blockp->write= 0;
421 }
422 }
423 else /* this is not the last block */
424 {
425 /* if block was completely dequeued, remove it */
426 if (blockp->read == ELEMENT_BLOCKSIZE)
427 {
428 if (exp) queuehead->exp_first_block= blockp->next_block;
429 else queuehead->first_block= blockp->next_block;
430 free(blockp);
431 }
432 }
433 if (exp) queuehead->exp_nr_of_elements--;
434 queuehead->nr_of_elements--;
435 }
436 else
437 qerr(QERR_QEMPTY);
438
439 /* end critical section */
440 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
441 {
442 qerr(QERR_MUTEXUNLOCK);
443 return NULL;
444 }
445 }
446 else
447 qerr(QERR_QINVALID);
448
449 return element;
450}
451
452int destroy_queue(queue_t *queuehead)
453/* destroys the queue and frees all resources, except the elements!
454 * the queue must be empty to destroy it.
455 * returns: -1 if an error occured, 0 otherwise
456 * arguments: pointer to queue_t object
457 */
458{
459 if (queuehead!=NULL)
460 {
461 /* queue not empty? */
462 if (queuehead->nr_of_elements != 0)
463 qerr(QERR_QNOTEMPTY);
464 else
465 {
466 /* destroy condition variable */
467 if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
468 /* destroy mutex */
469 if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
470
471 pthread_mutexattr_destroy(&queuehead->mutex_attr);
472
473 /* free memory for queuehead */
474#ifdef QUEUELEN
475 fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
476 queuehead->name, queuehead->queue_maxlength);
477#endif
478 free(queuehead->exp_last_block);
479 free(queuehead->last_block);
480 free(queuehead);
481 }
482
483 return 0;
484 }
485 else
486 qerr(QERR_QINVALID);
487
488 return -1;
489}
490
491void *dequeue_element_nonblocking(queue_t *queuehead)
492/* if queue contains an element return and remove it.
493 * returns: NULL if an error occured or queue was empty, the pointer to
494 * the element otherwise.
495 * arguments: pointer to queue_t object
496 */
497{
498 void *element;
499 queue_elblock_t *blockp;
500 int exp = 0;
501 element= NULL;
502
503 if (queuehead != NULL)
504 {
505 if (pthread_mutex_lock(&queuehead->mutex)!=0)
506 {
507 qerr(QERR_MUTEXLOCK); return NULL;
508 }
509
510 /* begin critical section */
511
512 if (queuehead->nr_of_elements==0)
513 {
514 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
515 return NULL;
516 }
517
518 exp = (queuehead->exp_nr_of_elements!=0);
519 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
520 if (blockp != NULL)
521 {
522 /* get the first element */
523 element= blockp->element[blockp->read];
524 blockp->read++;
525
526 if (blockp->next_block == NULL) /* this is the last block */
527 {
528 if (blockp->read == blockp->write)
529 { /* block is completely dequeued, so reset values */
530 /* the last block always remains allocated! */
531 blockp->read= 0;
532 blockp->write= 0;
533 }
534 }
535 else /* this is not the last block */
536 {
537 /* if block was completely dequeued, remove it */
538 if (blockp->read == ELEMENT_BLOCKSIZE)
539 {
540 if (exp) queuehead->exp_first_block= blockp->next_block;
541 else queuehead->first_block= blockp->next_block;
542 free(blockp);
543 }
544 }
545 if (exp) queuehead->exp_nr_of_elements--;
546 queuehead->nr_of_elements--;
547 }
548 else
549 qerr(QERR_QEMPTY);
550
551 /* end critical section */
552 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
553 {
554 qerr(QERR_MUTEXUNLOCK);
555 return NULL;
556 }
557 }
558 else
559 qerr(QERR_QINVALID);
560
561 return element;
562}
563
564unsigned long queue_nr_of_elements(queue_t *queuehead)
565/** Get number fo elements in queue. */
566{
567 unsigned long result = 0;
568
569 if (queuehead != NULL)
570 {
571 if (pthread_mutex_lock(&queuehead->mutex)!=0)
572 {
573 qerr(QERR_MUTEXLOCK);
574 return 0;
575 }
576 /* begin critical section */
577
578 result = queuehead->nr_of_elements;
579
580 /* end critical section */
581 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
582 }
583 else
584 qerr(QERR_QINVALID);
585
586 return result;
587}
588
589int queue_is_expedited_enabled(queue_t *queuehead)
590/** Get exp_enabled flag. */
591{
592 int result = 0;
593
594 if (queuehead != NULL)
595 {
596 if (pthread_mutex_lock(&queuehead->mutex)!=0)
597 {
598 qerr(QERR_MUTEXLOCK);
599 return 0;
600 }
601 /* begin critical section */
602
603 result = queuehead->exp_enabled;
604
605 /* end critical section */
606 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
607 }
608 else
609 qerr(QERR_QINVALID);
610
611 return result;
612}
613
614int queue_enable_expedited(queue_t *queuehead, int exp)
615/** Set exp_enabled flag and return old value. */
616{
617 int result = 0;
618
619 if (queuehead != NULL)
620 {
621 if (pthread_mutex_lock(&queuehead->mutex)!=0)
622 {
623 qerr(QERR_MUTEXLOCK);
624 return 0;
625 }
626 /* begin critical section */
627
628 result = queuehead->exp_enabled;
629 if (exp) queuehead->exp_enabled = 1;
630 else queuehead->exp_enabled = 0;
631
632 /* end critical section */
633 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
634 }
635 else
636 qerr(QERR_QINVALID);
637
638 return result;
639}
640
641//@}
Note: See TracBrowser for help on using the repository browser.