An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/protlib/fastqueue.c @ 9991

Last change on this file since 9991 was 9991, checked in by Christoph Mayer, 12 years ago

-fixes on protlib for android

File size: 17.6 KB
RevLine 
[5284]1/// ----------------------------------------*- mode: C++; -*--
2/// @file fastqueue.c
3/// a simple FIFO queue with mutexes for use with pthreads
4/// ----------------------------------------------------------
5/// $Id: fastqueue.c 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/fastqueue/fastqueue.c $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
[9686]30/** @addtogroup protlib
[5284]31 * @{
32 */
33
34/** @file
35 * Fast and thread-safe queue.
36 */
37
38/******************************************************************************
39 * fastqueue.c -- a simple FIFO queue with mutexes for use with pthreads      *
40 * -------------------------------------------------------------------------- *
41 * written by Roland Bless 1995                                               *
42 * all operations enqueue,dequeue are done in O(1) which means constant time  *
43 ******************************************************************************/
44
45#define _GNU_SOURCE
46#include <stdio.h>
47#include <string.h>
48#include <pthread.h>
49#include <stdlib.h>
50#include <errno.h>
51#include <sys/time.h>
52#include <unistd.h>
53
54                       /**** module interface ****/
55#include "fastqueue.h"
56
57                  /*************** defines *****************/
58#define qerr(errnr)      fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
59
60
61#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
62#define PRI_OTHER_MIN  PRI_FG_MIN_NP
63#define PRI_OTHER_MAX  PRI_FG_MAX_NP
64#define PRI_FG_MIN_NP  8
65#define PRI_FG_MAX_NP  15
66
67#define CLOCK_REALTIME 0
68#define NSEC_PER_SEC   1000000000
69extern int eclock_gettime(struct timespec *tp);
70#define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
71
72                 /*************** typedefs *****************/
73
74enum {
75       QERR_NONE,
76       QERR_NOMEM,
77       QERR_MUTEXINIT,
78       QERR_MUTEXLOCK,
79       QERR_MUTEXUNLOCK,
80       QERR_MUTEXDESTROY,
81       QERR_QEMPTY,
82       QERR_QINVALID,
83       QERR_QNOTEMPTY,
84       QERR_CONDINIT,
85       QERR_CONDWAIT,
86       QERR_CONDSIGNAL,
87       QERR_CONDDESTROY
88};
89
90const 
91char *const queue_errmsg[]=
92{
93  "all ok",
94  "can't get enough memory",
95  "initializing mutex",
96  "locking mutex",
97  "unlocking mutex",
98  "destroying mutex",
99  "queue empty",
100  "invalid queueobject",
101  "destroying queue - queue not empty",
102  "initializing queue condition variable",
103  "waiting on condition",
104  "signalling condition",
105  "destroying condition"
106};
107
108
109
110queue_t *create_queue(const char* name)
111/* initialization routine for a queue.
112 * returns: NULL if an error occured, or a queue-object which is actually
113 *          a queueheader structure
114 * arguments: none
115 */
116{
117  queue_t *queuehead;
118
119  /* Allocate memory for queue head */
120  if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
121  {
122    /* Set mutex kind */
123    pthread_mutexattr_init(&queuehead->mutex_attr);
124#ifdef _DEBUG
125    pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
126#else
127    pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
128#endif
129    /* use PTHREAD_MUTEX_ERRORCHECK or PTHREAD_MUTEX_ERRORCHECK_NP for testing */
130
131    /* Initialize mutex */
132    if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
133    {
134      /* Create Condition variable for command queue */
135      if (pthread_cond_init(&queuehead->cond, NULL)==0)
136      {
137        queuehead->nr_of_elements= 0UL;
138        queuehead->exp_nr_of_elements= 0UL;
139        queuehead->exp_enabled = 0;
140        queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
141        queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
142        if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
143           qerr(QERR_NOMEM);
144        else
145        {
146          queuehead->first_block->read=  0;
147          queuehead->first_block->write= 0;
148          queuehead->first_block->next_block= NULL;         
149          queuehead->last_block= queuehead->first_block;
150          queuehead->exp_first_block->read=  0;
151          queuehead->exp_first_block->write= 0;
152          queuehead->exp_first_block->next_block= NULL;         
153          queuehead->exp_last_block= queuehead->exp_first_block;
154          if (name)
155          {
156            if (strlen(name) <= MAX_QUEUENAME_LENGTH)
157              strcpy(queuehead->name, name);
158            else
159            {
160              memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
161              queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
162            }
163          }
164          else
165            queuehead->name[0] = '\0';
166          /* Now it's simple to enqueue elements, esp. the first one */
167        }
168//#ifdef QUEUELEN
169        queuehead->queue_maxlength= 0;
170//#endif
171      }
172      else
173        qerr(QERR_CONDINIT);
174    }
175    else /* error during initialize */
176      qerr(QERR_MUTEXINIT);
177  }
178  else
179    qerr(QERR_NOMEM);
180
181  return queuehead;
182}
183
184int enqueue_element_signal(queue_t *queuehead, void *element)
185{
186  return enqueue_element_expedited_signal(queuehead,element,0);
187}
188
189int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
190/* add a new element into the queue. Memory for the element must be
191 * allocated anywhere else. This routine signals other waiting threads.
192 * returns: -1 if an error occured, or 0 is action could be performed
193 * arguments: pointer to queue_t object, pointer to an element
194 */
195{
196  queue_elblock_t *newelement, *lastblockp;
197
198  if (queuehead==NULL)
199  {
200    qerr(QERR_QINVALID);
201    return -1;
202  }
203
204  if (pthread_mutex_lock(&queuehead->mutex)!=0)
205  {
206    qerr(QERR_MUTEXLOCK);
207    return -1;
208  }
209  /* begin critical section */
210
211  if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
212  /* Allocate new element structure when necessary */
213  /* Note: queuehead->last_block must always contain a valid value */
214  lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
215  if (lastblockp->write == ELEMENT_BLOCKSIZE)
216  { /* last block is full, so allocate a new block */
217    if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
218    {
219      qerr(QERR_NOMEM);
220      return -1;
221    }
222
223    /* initialize new structure */
224    newelement->element[0]= element;
225    newelement->read      = 0;
226    newelement->write     = 1;
227    newelement->next_block= NULL;
228   
229    /* append new element to the end */
230    lastblockp->next_block= newelement;
231    /* new element becomes last element */
232    if (exp) queuehead->exp_last_block = newelement; 
233    else queuehead->last_block = newelement; 
234  }
235  else /* last block was not full */
236  { 
237    lastblockp->element[lastblockp->write]= element;
238    lastblockp->write++;
239  }
240 
241  if (exp) queuehead->exp_nr_of_elements++;
242  queuehead->nr_of_elements++;
243//#ifdef QUEUELEN
244    if (queuehead->nr_of_elements > queuehead->queue_maxlength)
245       queuehead->queue_maxlength= queuehead->nr_of_elements;
246//#endif
247  /* Condition should be set while mutex is locked.
248     Recommended by libc manual.
249  */
250  if (pthread_cond_signal(&queuehead->cond)!=0)
251     qerr(QERR_CONDSIGNAL);
252  /* end critical section */
253  if (pthread_mutex_unlock(&queuehead->mutex)!=0)
254  {
255    qerr(QERR_MUTEXUNLOCK);
256    return -1;
257  }
258  // see above
259//  if (pthread_cond_signal(&queuehead->cond)!=0)
260//     qerr(QERR_CONDSIGNAL);
261
262  return 0;
263}
264
265
266void *dequeue_element_wait(queue_t *queuehead)
267/* wait for the queue to contain an element.
268 * if it contains an element return and remove it.
269 * returns: NULL if an error occured, the pointer to the element otherwise
270 * arguments: pointer to queue_t object
271 */
272{
273  void       *element;
274  queue_elblock_t *blockp;
275  int exp = 0;
276  element= NULL;
277  int retcode= 0;
278
279  if (queuehead != NULL)
280  {
281    /* Wait for an element in the queue */
282    /* Before waiting on a condition, the associated mutex must be locked */
283    if (pthread_mutex_lock(&queuehead->mutex)!=0)
284    {
285      qerr(QERR_MUTEXLOCK); return NULL;
286    }
287
288    while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
289    { /* for a safe state the predicate must be checked in a loop! */
290      /* cond_wait() unlocks the mutex and might return sometimes without
291         getting a signal! */
292      if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
293      {
294         if (retcode!=EINTR && retcode!=ETIMEDOUT)
295         {
296           qerr(QERR_CONDWAIT);
297         }
298      }
299    }
300
301    /* begin critical section */
302    exp = (queuehead->exp_nr_of_elements!=0);
303    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
304    if (blockp != NULL)
305    {
306      /* get the first element */
307      element= blockp->element[blockp->read];
308      blockp->read++;
309
310      if (blockp->next_block == NULL) /* this is the last block */
311      {
312        if (blockp->read == blockp->write) 
313        { /* block is completely dequeued, so reset values */
314          /* the last block always remains allocated! */
315          blockp->read=  0;
316          blockp->write= 0;
317        }
318      }
319      else /* this is not the last block */
320      {
321        /* if block was completely dequeued, remove it */
322        if (blockp->read == ELEMENT_BLOCKSIZE)
323        {
324          if (exp) queuehead->exp_first_block= blockp->next_block;
325          else queuehead->first_block= blockp->next_block;
326          free(blockp);
327        }
328      }
329      if (exp) queuehead->exp_nr_of_elements--;
330      queuehead->nr_of_elements--;
331    }
332    else
333      qerr(QERR_QEMPTY);
334
335    /* end critical section */
336    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
337    {
338      qerr(QERR_MUTEXUNLOCK);
339      return NULL;
340    }
341  }
342  else
343    qerr(QERR_QINVALID);
344
345  return element;
346}
347
348void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
349/* wait for the queue to contain an element.
350 * if it contains an element return and remove it.
351 * returns: NULL if an error occured, the pointer to the element otherwise
352 * arguments: pointer to queue_t object
353 *            tpsec is the time interval to wait (not an absolute time!)
354 */
355{
356  void       *element;
357  queue_elblock_t *blockp;
358  int result;
359  struct timespec abs_tspec;
360  int exp = 0;
361  element= NULL;
362
363  if (queuehead != NULL)
364  {
365    /* Wait for an element in the queue */
366    /* Before waiting on a condition, the associated mutex must be locked */
367    if (pthread_mutex_lock(&queuehead->mutex)!=0)
368    {
369      qerr(QERR_MUTEXLOCK); return NULL;
370    }
371
372    while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
373    { /* for a safe state the predicate must be checked in a loop! */
374      /* cond_wait() unlocks the mutex and might return sometimes without
375         getting a signal! */
376      clock_gettime(CLOCK_REALTIME, &abs_tspec);
377      abs_tspec.tv_nsec+= tspec->tv_nsec;
378      abs_tspec.tv_sec+= tspec->tv_sec;
379      if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
380      {
381        abs_tspec.tv_nsec%= NSEC_PER_SEC;
382        abs_tspec.tv_sec++;
383      };
384
385      if ((result = pthread_cond_timedwait(&queuehead->cond,
386                                           &queuehead->mutex, &abs_tspec))!=0)
387      {
388        if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) ) 
389        {
390          qerr(QERR_CONDWAIT);
391        } 
392        else 
393        { /* timeout */
394          if (pthread_mutex_unlock(&queuehead->mutex)!=0) 
395          {
396            qerr(QERR_MUTEXUNLOCK);
397            return NULL;
398          } 
399          return NULL;
400        }
401      }
402    }
403
404    /* begin critical section */
405    exp = (queuehead->exp_nr_of_elements!=0);
406    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
407    if (blockp != NULL)
408    {
409      /* get the first element */
410      element= blockp->element[blockp->read];
411      blockp->read++;
412
413      if (blockp->next_block == NULL) /* this is the last block */
414      {
415        if (blockp->read == blockp->write) 
416        { /* block is completely dequeued, so reset values */
417          /* the last block always remains allocated! */
418          blockp->read=  0;
419          blockp->write= 0;
420        }
421      }
422      else /* this is not the last block */
423      {
424        /* if block was completely dequeued, remove it */
425        if (blockp->read == ELEMENT_BLOCKSIZE)
426        {
427          if (exp) queuehead->exp_first_block= blockp->next_block;
428          else queuehead->first_block= blockp->next_block;
429          free(blockp);
430        }
431      }
432      if (exp) queuehead->exp_nr_of_elements--;
433      queuehead->nr_of_elements--;
434    }
435    else
436      qerr(QERR_QEMPTY);
437
438    /* end critical section */
439    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
440    {
441      qerr(QERR_MUTEXUNLOCK);
442      return NULL;
443    }
444  }
445  else
446    qerr(QERR_QINVALID);
447
448  return element;
449}
450
451int destroy_queue(queue_t *queuehead)
452/* destroys the queue and frees all resources, except the elements!
453 * the queue must be empty to destroy it.
454 * returns: -1 if an error occured, 0 otherwise
455 * arguments: pointer to queue_t object
456 */
457{
458  if (queuehead!=NULL)
459  {
460    /* queue not empty? */
461    if (queuehead->nr_of_elements != 0)
462      qerr(QERR_QNOTEMPTY);
463    else
464    {
465      /* destroy condition variable */
466      if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
467      /* destroy mutex */
468      if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
469     
470      pthread_mutexattr_destroy(&queuehead->mutex_attr);
471
472      /* free memory for queuehead */
473#ifdef QUEUELEN
474      fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
475              queuehead->name, queuehead->queue_maxlength);
476#endif     
477      free(queuehead->exp_last_block);
478      free(queuehead->last_block);
479      free(queuehead);
480    }
481
482    return 0;
483  }
484  else
485    qerr(QERR_QINVALID);
486
487  return -1;
488}
489
490void *dequeue_element_nonblocking(queue_t *queuehead)
491/* if queue contains an element return and remove it.
492 * returns: NULL if an error occured or queue was empty, the pointer to
493 * the element otherwise.
494 * arguments: pointer to queue_t object
495 */
496{
497  void       *element;
498  queue_elblock_t *blockp;
499  int exp = 0;
500  element= NULL;
501
502  if (queuehead != NULL)
503  {
504    if (pthread_mutex_lock(&queuehead->mutex)!=0)
505    {
506      qerr(QERR_MUTEXLOCK); return NULL;
507    }
508
509    /* begin critical section */
510
511    if (queuehead->nr_of_elements==0) 
512    { 
513      if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
514      return NULL;
515    }
516
517    exp = (queuehead->exp_nr_of_elements!=0);
518    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
519    if (blockp != NULL)
520    {
521      /* get the first element */
522      element= blockp->element[blockp->read];
523      blockp->read++;
524
525      if (blockp->next_block == NULL) /* this is the last block */
526      {
527        if (blockp->read == blockp->write) 
528        { /* block is completely dequeued, so reset values */
529          /* the last block always remains allocated! */
530          blockp->read=  0;
531          blockp->write= 0;
532        }
533      }
534      else /* this is not the last block */
535      {
536        /* if block was completely dequeued, remove it */
537        if (blockp->read == ELEMENT_BLOCKSIZE)
538        {
539          if (exp) queuehead->exp_first_block= blockp->next_block;
540          else queuehead->first_block= blockp->next_block;
541          free(blockp);
542        }
543      }
544      if (exp) queuehead->exp_nr_of_elements--;
545      queuehead->nr_of_elements--;
546    }
547    else
548      qerr(QERR_QEMPTY);
549
550    /* end critical section */
551    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
552    {
553      qerr(QERR_MUTEXUNLOCK);
554      return NULL;
555    }
556  }
557  else
558    qerr(QERR_QINVALID);
559
560  return element;
561}
562
563unsigned long queue_nr_of_elements(queue_t *queuehead)
564/** Get number fo elements in queue. */
565{
566  unsigned long result = 0;
567
568  if (queuehead != NULL)
569  {
570    if (pthread_mutex_lock(&queuehead->mutex)!=0)
571    {
572      qerr(QERR_MUTEXLOCK);
573      return 0;
574    }
575    /* begin critical section */
576
577    result = queuehead->nr_of_elements; 
578
579    /* end critical section */
580    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
581  }
582  else
583    qerr(QERR_QINVALID);
584
585  return result;
586}
587
588int queue_is_expedited_enabled(queue_t *queuehead)
589/** Get exp_enabled flag. */
590{
591  int result = 0;
592
593  if (queuehead != NULL)
594  {
595    if (pthread_mutex_lock(&queuehead->mutex)!=0)
596    {
597      qerr(QERR_MUTEXLOCK);
598      return 0;
599    }
600    /* begin critical section */
601
602    result = queuehead->exp_enabled; 
603
604    /* end critical section */
605    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
606  }
607  else
608    qerr(QERR_QINVALID);
609
610  return result;
611}
612
613int queue_enable_expedited(queue_t *queuehead, int exp)
614/** Set exp_enabled flag and return old value. */
615{
616  int result = 0;
617
618  if (queuehead != NULL)
619  {
620    if (pthread_mutex_lock(&queuehead->mutex)!=0)
621    {
622      qerr(QERR_MUTEXLOCK);
623      return 0;
624    }
625    /* begin critical section */
626
627    result = queuehead->exp_enabled; 
628    if (exp) queuehead->exp_enabled = 1;
629    else queuehead->exp_enabled = 0;
630
631    /* end critical section */
632    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
633  }
634  else
635    qerr(QERR_QINVALID);
636
637  return result;
638}
639
640//@}
Note: See TracBrowser for help on using the repository browser.