source: source/ariba/communication/modules/transport/protlib/fastqueue.c@ 5638

Last change on this file since 5638 was 5638, checked in by Christoph Mayer, 15 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 18.0 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file fastqueue.c
3/// a simple FIFO queue with mutexes for use with pthreads
4/// ----------------------------------------------------------
5/// $Id: fastqueue.c 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/fastqueue/fastqueue.c $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/** @addtogroup fastqueue Fast Queue
31 * @{
32 */
33
34/** @file
35 * Fast and thread-safe queue.
36 */
37
38/******************************************************************************
39 * fastqueue.c -- a simple FIFO queue with mutexes for use with pthreads *
40 * -------------------------------------------------------------------------- *
41 * written by Roland Bless 1995 *
42 * all operations enqueue,dequeue are done in O(1) which means constant time *
43 ******************************************************************************/
44
45#define _GNU_SOURCE
46#include <stdio.h>
47#include <string.h>
48#include <pthread.h>
49#include <stdlib.h>
50#include <errno.h>
51#include <sys/time.h>
52#include <unistd.h>
53
54 /**** module interface ****/
55#include "fastqueue.h"
56
57 /*************** defines *****************/
58#define qerr(errnr) fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
59
60#ifdef __linux__
61
62// not needed for Linux
63//#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
64
65#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
66#define PRI_OTHER_MIN PRI_FG_MIN_NP
67#define PRI_OTHER_MAX PRI_FG_MAX_NP
68#define PRI_FG_MIN_NP 8
69#define PRI_FG_MAX_NP 15
70
71#define CLOCK_REALTIME 0
72#define NSEC_PER_SEC 1000000000
73extern int eclock_gettime(struct timespec *tp);
74#define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
75
76#elif _DECTHREADS_VERSION < 314126L
77
78#define pthread_mutexattr_settype pthread_mutexattr_settype_np
79#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_NORMAL_NP
80#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
81#endif
82 /*************** typedefs *****************/
83
84enum {
85 QERR_NONE,
86 QERR_NOMEM,
87 QERR_MUTEXINIT,
88 QERR_MUTEXLOCK,
89 QERR_MUTEXUNLOCK,
90 QERR_MUTEXDESTROY,
91 QERR_QEMPTY,
92 QERR_QINVALID,
93 QERR_QNOTEMPTY,
94 QERR_CONDINIT,
95 QERR_CONDWAIT,
96 QERR_CONDSIGNAL,
97 QERR_CONDDESTROY
98};
99
100const
101char *const queue_errmsg[]=
102{
103 "all ok",
104 "can't get enough memory",
105 "initializing mutex",
106 "locking mutex",
107 "unlocking mutex",
108 "destroying mutex",
109 "queue empty",
110 "invalid queueobject",
111 "destroying queue - queue not empty",
112 "initializing queue condition variable",
113 "waiting on condition",
114 "signalling condition",
115 "destroying condition"
116};
117
118
119
120queue_t *create_queue(const char* name)
121/* initialization routine for a queue.
122 * returns: NULL if an error occured, or a queue-object which is actually
123 * a queueheader structure
124 * arguments: none
125 */
126{
127 queue_t *queuehead;
128
129 /* Allocate memory for queue head */
130 if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
131 {
132 /* Set mutex kind */
133 pthread_mutexattr_init(&queuehead->mutex_attr);
134#ifdef _DEBUG
135 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
136#else
137 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
138#endif
139 /* use PTHREAD_MUTEX_ERRORCHECK or PTHREAD_MUTEX_ERRORCHECK_NP for testing */
140
141 /* Initialize mutex */
142 if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
143 {
144 /* Create Condition variable for command queue */
145 if (pthread_cond_init(&queuehead->cond, NULL)==0)
146 {
147 queuehead->nr_of_elements= 0UL;
148 queuehead->exp_nr_of_elements= 0UL;
149 queuehead->exp_enabled = 0;
150 queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
151 queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
152 if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
153 qerr(QERR_NOMEM);
154 else
155 {
156 queuehead->first_block->read= 0;
157 queuehead->first_block->write= 0;
158 queuehead->first_block->next_block= NULL;
159 queuehead->last_block= queuehead->first_block;
160 queuehead->exp_first_block->read= 0;
161 queuehead->exp_first_block->write= 0;
162 queuehead->exp_first_block->next_block= NULL;
163 queuehead->exp_last_block= queuehead->exp_first_block;
164 if (name)
165 {
166 if (strlen(name) <= MAX_QUEUENAME_LENGTH)
167 strcpy(queuehead->name, name);
168 else
169 {
170 memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
171 queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
172 }
173 }
174 else
175 queuehead->name[0] = '\0';
176 /* Now it's simple to enqueue elements, esp. the first one */
177 }
178//#ifdef QUEUELEN
179 queuehead->queue_maxlength= 0;
180//#endif
181 }
182 else
183 qerr(QERR_CONDINIT);
184 }
185 else /* error during initialize */
186 qerr(QERR_MUTEXINIT);
187 }
188 else
189 qerr(QERR_NOMEM);
190
191 return queuehead;
192}
193
194int enqueue_element_signal(queue_t *queuehead, void *element)
195{
196 return enqueue_element_expedited_signal(queuehead,element,0);
197}
198
199int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
200/* add a new element into the queue. Memory for the element must be
201 * allocated anywhere else. This routine signals other waiting threads.
202 * returns: -1 if an error occured, or 0 is action could be performed
203 * arguments: pointer to queue_t object, pointer to an element
204 */
205{
206 queue_elblock_t *newelement, *lastblockp;
207
208 if (queuehead==NULL)
209 {
210 qerr(QERR_QINVALID);
211 return -1;
212 }
213
214 if (pthread_mutex_lock(&queuehead->mutex)!=0)
215 {
216 qerr(QERR_MUTEXLOCK);
217 return -1;
218 }
219 /* begin critical section */
220
221 if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
222 /* Allocate new element structure when necessary */
223 /* Note: queuehead->last_block must always contain a valid value */
224 lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
225 if (lastblockp->write == ELEMENT_BLOCKSIZE)
226 { /* last block is full, so allocate a new block */
227 if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
228 {
229 qerr(QERR_NOMEM);
230 return -1;
231 }
232
233 /* initialize new structure */
234 newelement->element[0]= element;
235 newelement->read = 0;
236 newelement->write = 1;
237 newelement->next_block= NULL;
238
239 /* append new element to the end */
240 lastblockp->next_block= newelement;
241 /* new element becomes last element */
242 if (exp) queuehead->exp_last_block = newelement;
243 else queuehead->last_block = newelement;
244 }
245 else /* last block was not full */
246 {
247 lastblockp->element[lastblockp->write]= element;
248 lastblockp->write++;
249 }
250
251 if (exp) queuehead->exp_nr_of_elements++;
252 queuehead->nr_of_elements++;
253//#ifdef QUEUELEN
254 if (queuehead->nr_of_elements > queuehead->queue_maxlength)
255 queuehead->queue_maxlength= queuehead->nr_of_elements;
256//#endif
257 /* Condition should be set while mutex is locked.
258 Recommended by libc manual.
259 */
260 if (pthread_cond_signal(&queuehead->cond)!=0)
261 qerr(QERR_CONDSIGNAL);
262 /* end critical section */
263 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
264 {
265 qerr(QERR_MUTEXUNLOCK);
266 return -1;
267 }
268 // see above
269// if (pthread_cond_signal(&queuehead->cond)!=0)
270// qerr(QERR_CONDSIGNAL);
271
272 return 0;
273}
274
275
276void *dequeue_element_wait(queue_t *queuehead)
277/* wait for the queue to contain an element.
278 * if it contains an element return and remove it.
279 * returns: NULL if an error occured, the pointer to the element otherwise
280 * arguments: pointer to queue_t object
281 */
282{
283 void *element;
284 queue_elblock_t *blockp;
285 int exp = 0;
286 element= NULL;
287 int retcode= 0;
288
289 if (queuehead != NULL)
290 {
291 /* Wait for an element in the queue */
292 /* Before waiting on a condition, the associated mutex must be locked */
293 if (pthread_mutex_lock(&queuehead->mutex)!=0)
294 {
295 qerr(QERR_MUTEXLOCK); return NULL;
296 }
297
298 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
299 { /* for a safe state the predicate must be checked in a loop! */
300 /* cond_wait() unlocks the mutex and might return sometimes without
301 getting a signal! */
302 if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
303 {
304 if (retcode!=EINTR && retcode!=ETIMEDOUT)
305 {
306 qerr(QERR_CONDWAIT);
307 }
308 }
309 }
310
311 /* begin critical section */
312 exp = (queuehead->exp_nr_of_elements!=0);
313 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
314 if (blockp != NULL)
315 {
316 /* get the first element */
317 element= blockp->element[blockp->read];
318 blockp->read++;
319
320 if (blockp->next_block == NULL) /* this is the last block */
321 {
322 if (blockp->read == blockp->write)
323 { /* block is completely dequeued, so reset values */
324 /* the last block always remains allocated! */
325 blockp->read= 0;
326 blockp->write= 0;
327 }
328 }
329 else /* this is not the last block */
330 {
331 /* if block was completely dequeued, remove it */
332 if (blockp->read == ELEMENT_BLOCKSIZE)
333 {
334 if (exp) queuehead->exp_first_block= blockp->next_block;
335 else queuehead->first_block= blockp->next_block;
336 free(blockp);
337 }
338 }
339 if (exp) queuehead->exp_nr_of_elements--;
340 queuehead->nr_of_elements--;
341 }
342 else
343 qerr(QERR_QEMPTY);
344
345 /* end critical section */
346 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
347 {
348 qerr(QERR_MUTEXUNLOCK);
349 return NULL;
350 }
351 }
352 else
353 qerr(QERR_QINVALID);
354
355 return element;
356}
357
358void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
359/* wait for the queue to contain an element.
360 * if it contains an element return and remove it.
361 * returns: NULL if an error occured, the pointer to the element otherwise
362 * arguments: pointer to queue_t object
363 * tpsec is the time interval to wait (not an absolute time!)
364 */
365{
366 void *element;
367 queue_elblock_t *blockp;
368 int result;
369 struct timespec abs_tspec;
370 int exp = 0;
371 element= NULL;
372
373 if (queuehead != NULL)
374 {
375 /* Wait for an element in the queue */
376 /* Before waiting on a condition, the associated mutex must be locked */
377 if (pthread_mutex_lock(&queuehead->mutex)!=0)
378 {
379 qerr(QERR_MUTEXLOCK); return NULL;
380 }
381
382 while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
383 { /* for a safe state the predicate must be checked in a loop! */
384 /* cond_wait() unlocks the mutex and might return sometimes without
385 getting a signal! */
386 clock_gettime(CLOCK_REALTIME, &abs_tspec);
387 abs_tspec.tv_nsec+= tspec->tv_nsec;
388 abs_tspec.tv_sec+= tspec->tv_sec;
389 if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
390 {
391 abs_tspec.tv_nsec%= NSEC_PER_SEC;
392 abs_tspec.tv_sec++;
393 };
394
395 if ((result = pthread_cond_timedwait(&queuehead->cond,
396 &queuehead->mutex, &abs_tspec))!=0)
397 {
398 if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) )
399 {
400 qerr(QERR_CONDWAIT);
401 }
402 else
403 { /* timeout */
404 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
405 {
406 qerr(QERR_MUTEXUNLOCK);
407 return NULL;
408 }
409 return NULL;
410 }
411 }
412 }
413
414 /* begin critical section */
415 exp = (queuehead->exp_nr_of_elements!=0);
416 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
417 if (blockp != NULL)
418 {
419 /* get the first element */
420 element= blockp->element[blockp->read];
421 blockp->read++;
422
423 if (blockp->next_block == NULL) /* this is the last block */
424 {
425 if (blockp->read == blockp->write)
426 { /* block is completely dequeued, so reset values */
427 /* the last block always remains allocated! */
428 blockp->read= 0;
429 blockp->write= 0;
430 }
431 }
432 else /* this is not the last block */
433 {
434 /* if block was completely dequeued, remove it */
435 if (blockp->read == ELEMENT_BLOCKSIZE)
436 {
437 if (exp) queuehead->exp_first_block= blockp->next_block;
438 else queuehead->first_block= blockp->next_block;
439 free(blockp);
440 }
441 }
442 if (exp) queuehead->exp_nr_of_elements--;
443 queuehead->nr_of_elements--;
444 }
445 else
446 qerr(QERR_QEMPTY);
447
448 /* end critical section */
449 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
450 {
451 qerr(QERR_MUTEXUNLOCK);
452 return NULL;
453 }
454 }
455 else
456 qerr(QERR_QINVALID);
457
458 return element;
459}
460
461int destroy_queue(queue_t *queuehead)
462/* destroys the queue and frees all resources, except the elements!
463 * the queue must be empty to destroy it.
464 * returns: -1 if an error occured, 0 otherwise
465 * arguments: pointer to queue_t object
466 */
467{
468 if (queuehead!=NULL)
469 {
470 /* queue not empty? */
471 if (queuehead->nr_of_elements != 0)
472 qerr(QERR_QNOTEMPTY);
473 else
474 {
475 /* destroy condition variable */
476 if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
477 /* destroy mutex */
478 if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
479
480 pthread_mutexattr_destroy(&queuehead->mutex_attr);
481
482 /* free memory for queuehead */
483#ifdef QUEUELEN
484 fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
485 queuehead->name, queuehead->queue_maxlength);
486#endif
487 free(queuehead->exp_last_block);
488 free(queuehead->last_block);
489 free(queuehead);
490 }
491
492 return 0;
493 }
494 else
495 qerr(QERR_QINVALID);
496
497 return -1;
498}
499
500void *dequeue_element_nonblocking(queue_t *queuehead)
501/* if queue contains an element return and remove it.
502 * returns: NULL if an error occured or queue was empty, the pointer to
503 * the element otherwise.
504 * arguments: pointer to queue_t object
505 */
506{
507 void *element;
508 queue_elblock_t *blockp;
509 int exp = 0;
510 element= NULL;
511
512 if (queuehead != NULL)
513 {
514 if (pthread_mutex_lock(&queuehead->mutex)!=0)
515 {
516 qerr(QERR_MUTEXLOCK); return NULL;
517 }
518
519 /* begin critical section */
520
521 if (queuehead->nr_of_elements==0)
522 {
523 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
524 return NULL;
525 }
526
527 exp = (queuehead->exp_nr_of_elements!=0);
528 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
529 if (blockp != NULL)
530 {
531 /* get the first element */
532 element= blockp->element[blockp->read];
533 blockp->read++;
534
535 if (blockp->next_block == NULL) /* this is the last block */
536 {
537 if (blockp->read == blockp->write)
538 { /* block is completely dequeued, so reset values */
539 /* the last block always remains allocated! */
540 blockp->read= 0;
541 blockp->write= 0;
542 }
543 }
544 else /* this is not the last block */
545 {
546 /* if block was completely dequeued, remove it */
547 if (blockp->read == ELEMENT_BLOCKSIZE)
548 {
549 if (exp) queuehead->exp_first_block= blockp->next_block;
550 else queuehead->first_block= blockp->next_block;
551 free(blockp);
552 }
553 }
554 if (exp) queuehead->exp_nr_of_elements--;
555 queuehead->nr_of_elements--;
556 }
557 else
558 qerr(QERR_QEMPTY);
559
560 /* end critical section */
561 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
562 {
563 qerr(QERR_MUTEXUNLOCK);
564 return NULL;
565 }
566 }
567 else
568 qerr(QERR_QINVALID);
569
570 return element;
571}
572
573unsigned long queue_nr_of_elements(queue_t *queuehead)
574/** Get number fo elements in queue. */
575{
576 unsigned long result = 0;
577
578 if (queuehead != NULL)
579 {
580 if (pthread_mutex_lock(&queuehead->mutex)!=0)
581 {
582 qerr(QERR_MUTEXLOCK);
583 return 0;
584 }
585 /* begin critical section */
586
587 result = queuehead->nr_of_elements;
588
589 /* end critical section */
590 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
591 }
592 else
593 qerr(QERR_QINVALID);
594
595 return result;
596}
597
598int queue_is_expedited_enabled(queue_t *queuehead)
599/** Get exp_enabled flag. */
600{
601 int result = 0;
602
603 if (queuehead != NULL)
604 {
605 if (pthread_mutex_lock(&queuehead->mutex)!=0)
606 {
607 qerr(QERR_MUTEXLOCK);
608 return 0;
609 }
610 /* begin critical section */
611
612 result = queuehead->exp_enabled;
613
614 /* end critical section */
615 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
616 }
617 else
618 qerr(QERR_QINVALID);
619
620 return result;
621}
622
623int queue_enable_expedited(queue_t *queuehead, int exp)
624/** Set exp_enabled flag and return old value. */
625{
626 int result = 0;
627
628 if (queuehead != NULL)
629 {
630 if (pthread_mutex_lock(&queuehead->mutex)!=0)
631 {
632 qerr(QERR_MUTEXLOCK);
633 return 0;
634 }
635 /* begin critical section */
636
637 result = queuehead->exp_enabled;
638 if (exp) queuehead->exp_enabled = 1;
639 else queuehead->exp_enabled = 0;
640
641 /* end critical section */
642 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
643 }
644 else
645 qerr(QERR_QINVALID);
646
647 return result;
648}
649
650//@}
Note: See TracBrowser for help on using the repository browser.