An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/protlib/fastqueue.c @ 9992

Last change on this file since 9992 was 9992, checked in by Christoph Mayer, 12 years ago

-fixes on protlib for android

File size: 17.7 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file fastqueue.c
3/// a simple FIFO queue with mutexes for use with pthreads
4/// ----------------------------------------------------------
5/// $Id: fastqueue.c 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/fastqueue/fastqueue.c $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/** @addtogroup protlib
31 * @{
32 */
33
34/** @file
35 * Fast and thread-safe queue.
36 */
37
38/******************************************************************************
39 * fastqueue.c -- a simple FIFO queue with mutexes for use with pthreads      *
40 * -------------------------------------------------------------------------- *
41 * written by Roland Bless 1995                                               *
42 * all operations enqueue,dequeue are done in O(1) which means constant time  *
43 ******************************************************************************/
44
45#define _GNU_SOURCE
46#include <stdio.h>
47#include <string.h>
48#include <pthread.h>
49#include <stdlib.h>
50#include <errno.h>
51#include <sys/time.h>
52#include <unistd.h>
53
54                       /**** module interface ****/
55#include "fastqueue.h"
56
57                  /*************** defines *****************/
58#define qerr(errnr)      fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
59
60#ifndef ANDROID
61        #define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
62#endif
63#define PRI_OTHER_MIN  PRI_FG_MIN_NP
64#define PRI_OTHER_MAX  PRI_FG_MAX_NP
65#define PRI_FG_MIN_NP  8
66#define PRI_FG_MAX_NP  15
67
68#define CLOCK_REALTIME 0
69#define NSEC_PER_SEC   1000000000
70extern int eclock_gettime(struct timespec *tp);
71#define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
72
73                 /*************** typedefs *****************/
74
75enum {
76       QERR_NONE,
77       QERR_NOMEM,
78       QERR_MUTEXINIT,
79       QERR_MUTEXLOCK,
80       QERR_MUTEXUNLOCK,
81       QERR_MUTEXDESTROY,
82       QERR_QEMPTY,
83       QERR_QINVALID,
84       QERR_QNOTEMPTY,
85       QERR_CONDINIT,
86       QERR_CONDWAIT,
87       QERR_CONDSIGNAL,
88       QERR_CONDDESTROY
89};
90
91const 
92char *const queue_errmsg[]=
93{
94  "all ok",
95  "can't get enough memory",
96  "initializing mutex",
97  "locking mutex",
98  "unlocking mutex",
99  "destroying mutex",
100  "queue empty",
101  "invalid queueobject",
102  "destroying queue - queue not empty",
103  "initializing queue condition variable",
104  "waiting on condition",
105  "signalling condition",
106  "destroying condition"
107};
108
109
110
111queue_t *create_queue(const char* name)
112/* initialization routine for a queue.
113 * returns: NULL if an error occured, or a queue-object which is actually
114 *          a queueheader structure
115 * arguments: none
116 */
117{
118  queue_t *queuehead;
119
120  /* Allocate memory for queue head */
121  if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
122  {
123    /* Set mutex kind */
124    pthread_mutexattr_init(&queuehead->mutex_attr);
125#ifdef _DEBUG
126    pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
127#else
128    pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
129#endif
130    /* use PTHREAD_MUTEX_ERRORCHECK or PTHREAD_MUTEX_ERRORCHECK_NP for testing */
131
132    /* Initialize mutex */
133    if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
134    {
135      /* Create Condition variable for command queue */
136      if (pthread_cond_init(&queuehead->cond, NULL)==0)
137      {
138        queuehead->nr_of_elements= 0UL;
139        queuehead->exp_nr_of_elements= 0UL;
140        queuehead->exp_enabled = 0;
141        queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
142        queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
143        if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
144           qerr(QERR_NOMEM);
145        else
146        {
147          queuehead->first_block->read=  0;
148          queuehead->first_block->write= 0;
149          queuehead->first_block->next_block= NULL;         
150          queuehead->last_block= queuehead->first_block;
151          queuehead->exp_first_block->read=  0;
152          queuehead->exp_first_block->write= 0;
153          queuehead->exp_first_block->next_block= NULL;         
154          queuehead->exp_last_block= queuehead->exp_first_block;
155          if (name)
156          {
157            if (strlen(name) <= MAX_QUEUENAME_LENGTH)
158              strcpy(queuehead->name, name);
159            else
160            {
161              memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
162              queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
163            }
164          }
165          else
166            queuehead->name[0] = '\0';
167          /* Now it's simple to enqueue elements, esp. the first one */
168        }
169//#ifdef QUEUELEN
170        queuehead->queue_maxlength= 0;
171//#endif
172      }
173      else
174        qerr(QERR_CONDINIT);
175    }
176    else /* error during initialize */
177      qerr(QERR_MUTEXINIT);
178  }
179  else
180    qerr(QERR_NOMEM);
181
182  return queuehead;
183}
184
185int enqueue_element_signal(queue_t *queuehead, void *element)
186{
187  return enqueue_element_expedited_signal(queuehead,element,0);
188}
189
190int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
191/* add a new element into the queue. Memory for the element must be
192 * allocated anywhere else. This routine signals other waiting threads.
193 * returns: -1 if an error occured, or 0 is action could be performed
194 * arguments: pointer to queue_t object, pointer to an element
195 */
196{
197  queue_elblock_t *newelement, *lastblockp;
198
199  if (queuehead==NULL)
200  {
201    qerr(QERR_QINVALID);
202    return -1;
203  }
204
205  if (pthread_mutex_lock(&queuehead->mutex)!=0)
206  {
207    qerr(QERR_MUTEXLOCK);
208    return -1;
209  }
210  /* begin critical section */
211
212  if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
213  /* Allocate new element structure when necessary */
214  /* Note: queuehead->last_block must always contain a valid value */
215  lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
216  if (lastblockp->write == ELEMENT_BLOCKSIZE)
217  { /* last block is full, so allocate a new block */
218    if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
219    {
220      qerr(QERR_NOMEM);
221      return -1;
222    }
223
224    /* initialize new structure */
225    newelement->element[0]= element;
226    newelement->read      = 0;
227    newelement->write     = 1;
228    newelement->next_block= NULL;
229   
230    /* append new element to the end */
231    lastblockp->next_block= newelement;
232    /* new element becomes last element */
233    if (exp) queuehead->exp_last_block = newelement; 
234    else queuehead->last_block = newelement; 
235  }
236  else /* last block was not full */
237  { 
238    lastblockp->element[lastblockp->write]= element;
239    lastblockp->write++;
240  }
241 
242  if (exp) queuehead->exp_nr_of_elements++;
243  queuehead->nr_of_elements++;
244//#ifdef QUEUELEN
245    if (queuehead->nr_of_elements > queuehead->queue_maxlength)
246       queuehead->queue_maxlength= queuehead->nr_of_elements;
247//#endif
248  /* Condition should be set while mutex is locked.
249     Recommended by libc manual.
250  */
251  if (pthread_cond_signal(&queuehead->cond)!=0)
252     qerr(QERR_CONDSIGNAL);
253  /* end critical section */
254  if (pthread_mutex_unlock(&queuehead->mutex)!=0)
255  {
256    qerr(QERR_MUTEXUNLOCK);
257    return -1;
258  }
259  // see above
260//  if (pthread_cond_signal(&queuehead->cond)!=0)
261//     qerr(QERR_CONDSIGNAL);
262
263  return 0;
264}
265
266
267void *dequeue_element_wait(queue_t *queuehead)
268/* wait for the queue to contain an element.
269 * if it contains an element return and remove it.
270 * returns: NULL if an error occured, the pointer to the element otherwise
271 * arguments: pointer to queue_t object
272 */
273{
274  void       *element;
275  queue_elblock_t *blockp;
276  int exp = 0;
277  element= NULL;
278  int retcode= 0;
279
280  if (queuehead != NULL)
281  {
282    /* Wait for an element in the queue */
283    /* Before waiting on a condition, the associated mutex must be locked */
284    if (pthread_mutex_lock(&queuehead->mutex)!=0)
285    {
286      qerr(QERR_MUTEXLOCK); return NULL;
287    }
288
289    while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
290    { /* for a safe state the predicate must be checked in a loop! */
291      /* cond_wait() unlocks the mutex and might return sometimes without
292         getting a signal! */
293      if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
294      {
295         if (retcode!=EINTR && retcode!=ETIMEDOUT)
296         {
297           qerr(QERR_CONDWAIT);
298         }
299      }
300    }
301
302    /* begin critical section */
303    exp = (queuehead->exp_nr_of_elements!=0);
304    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
305    if (blockp != NULL)
306    {
307      /* get the first element */
308      element= blockp->element[blockp->read];
309      blockp->read++;
310
311      if (blockp->next_block == NULL) /* this is the last block */
312      {
313        if (blockp->read == blockp->write) 
314        { /* block is completely dequeued, so reset values */
315          /* the last block always remains allocated! */
316          blockp->read=  0;
317          blockp->write= 0;
318        }
319      }
320      else /* this is not the last block */
321      {
322        /* if block was completely dequeued, remove it */
323        if (blockp->read == ELEMENT_BLOCKSIZE)
324        {
325          if (exp) queuehead->exp_first_block= blockp->next_block;
326          else queuehead->first_block= blockp->next_block;
327          free(blockp);
328        }
329      }
330      if (exp) queuehead->exp_nr_of_elements--;
331      queuehead->nr_of_elements--;
332    }
333    else
334      qerr(QERR_QEMPTY);
335
336    /* end critical section */
337    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
338    {
339      qerr(QERR_MUTEXUNLOCK);
340      return NULL;
341    }
342  }
343  else
344    qerr(QERR_QINVALID);
345
346  return element;
347}
348
349void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
350/* wait for the queue to contain an element.
351 * if it contains an element return and remove it.
352 * returns: NULL if an error occured, the pointer to the element otherwise
353 * arguments: pointer to queue_t object
354 *            tpsec is the time interval to wait (not an absolute time!)
355 */
356{
357  void       *element;
358  queue_elblock_t *blockp;
359  int result;
360  struct timespec abs_tspec;
361  int exp = 0;
362  element= NULL;
363
364  if (queuehead != NULL)
365  {
366    /* Wait for an element in the queue */
367    /* Before waiting on a condition, the associated mutex must be locked */
368    if (pthread_mutex_lock(&queuehead->mutex)!=0)
369    {
370      qerr(QERR_MUTEXLOCK); return NULL;
371    }
372
373    while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
374    { /* for a safe state the predicate must be checked in a loop! */
375      /* cond_wait() unlocks the mutex and might return sometimes without
376         getting a signal! */
377      clock_gettime(CLOCK_REALTIME, &abs_tspec);
378      abs_tspec.tv_nsec+= tspec->tv_nsec;
379      abs_tspec.tv_sec+= tspec->tv_sec;
380      if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
381      {
382        abs_tspec.tv_nsec%= NSEC_PER_SEC;
383        abs_tspec.tv_sec++;
384      };
385
386      if ((result = pthread_cond_timedwait(&queuehead->cond,
387                                           &queuehead->mutex, &abs_tspec))!=0)
388      {
389        if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) ) 
390        {
391          qerr(QERR_CONDWAIT);
392        } 
393        else 
394        { /* timeout */
395          if (pthread_mutex_unlock(&queuehead->mutex)!=0) 
396          {
397            qerr(QERR_MUTEXUNLOCK);
398            return NULL;
399          } 
400          return NULL;
401        }
402      }
403    }
404
405    /* begin critical section */
406    exp = (queuehead->exp_nr_of_elements!=0);
407    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
408    if (blockp != NULL)
409    {
410      /* get the first element */
411      element= blockp->element[blockp->read];
412      blockp->read++;
413
414      if (blockp->next_block == NULL) /* this is the last block */
415      {
416        if (blockp->read == blockp->write) 
417        { /* block is completely dequeued, so reset values */
418          /* the last block always remains allocated! */
419          blockp->read=  0;
420          blockp->write= 0;
421        }
422      }
423      else /* this is not the last block */
424      {
425        /* if block was completely dequeued, remove it */
426        if (blockp->read == ELEMENT_BLOCKSIZE)
427        {
428          if (exp) queuehead->exp_first_block= blockp->next_block;
429          else queuehead->first_block= blockp->next_block;
430          free(blockp);
431        }
432      }
433      if (exp) queuehead->exp_nr_of_elements--;
434      queuehead->nr_of_elements--;
435    }
436    else
437      qerr(QERR_QEMPTY);
438
439    /* end critical section */
440    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
441    {
442      qerr(QERR_MUTEXUNLOCK);
443      return NULL;
444    }
445  }
446  else
447    qerr(QERR_QINVALID);
448
449  return element;
450}
451
452int destroy_queue(queue_t *queuehead)
453/* destroys the queue and frees all resources, except the elements!
454 * the queue must be empty to destroy it.
455 * returns: -1 if an error occured, 0 otherwise
456 * arguments: pointer to queue_t object
457 */
458{
459  if (queuehead!=NULL)
460  {
461    /* queue not empty? */
462    if (queuehead->nr_of_elements != 0)
463      qerr(QERR_QNOTEMPTY);
464    else
465    {
466      /* destroy condition variable */
467      if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
468      /* destroy mutex */
469      if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
470     
471      pthread_mutexattr_destroy(&queuehead->mutex_attr);
472
473      /* free memory for queuehead */
474#ifdef QUEUELEN
475      fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
476              queuehead->name, queuehead->queue_maxlength);
477#endif     
478      free(queuehead->exp_last_block);
479      free(queuehead->last_block);
480      free(queuehead);
481    }
482
483    return 0;
484  }
485  else
486    qerr(QERR_QINVALID);
487
488  return -1;
489}
490
491void *dequeue_element_nonblocking(queue_t *queuehead)
492/* if queue contains an element return and remove it.
493 * returns: NULL if an error occured or queue was empty, the pointer to
494 * the element otherwise.
495 * arguments: pointer to queue_t object
496 */
497{
498  void       *element;
499  queue_elblock_t *blockp;
500  int exp = 0;
501  element= NULL;
502
503  if (queuehead != NULL)
504  {
505    if (pthread_mutex_lock(&queuehead->mutex)!=0)
506    {
507      qerr(QERR_MUTEXLOCK); return NULL;
508    }
509
510    /* begin critical section */
511
512    if (queuehead->nr_of_elements==0) 
513    { 
514      if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
515      return NULL;
516    }
517
518    exp = (queuehead->exp_nr_of_elements!=0);
519    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
520    if (blockp != NULL)
521    {
522      /* get the first element */
523      element= blockp->element[blockp->read];
524      blockp->read++;
525
526      if (blockp->next_block == NULL) /* this is the last block */
527      {
528        if (blockp->read == blockp->write) 
529        { /* block is completely dequeued, so reset values */
530          /* the last block always remains allocated! */
531          blockp->read=  0;
532          blockp->write= 0;
533        }
534      }
535      else /* this is not the last block */
536      {
537        /* if block was completely dequeued, remove it */
538        if (blockp->read == ELEMENT_BLOCKSIZE)
539        {
540          if (exp) queuehead->exp_first_block= blockp->next_block;
541          else queuehead->first_block= blockp->next_block;
542          free(blockp);
543        }
544      }
545      if (exp) queuehead->exp_nr_of_elements--;
546      queuehead->nr_of_elements--;
547    }
548    else
549      qerr(QERR_QEMPTY);
550
551    /* end critical section */
552    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
553    {
554      qerr(QERR_MUTEXUNLOCK);
555      return NULL;
556    }
557  }
558  else
559    qerr(QERR_QINVALID);
560
561  return element;
562}
563
564unsigned long queue_nr_of_elements(queue_t *queuehead)
565/** Get number fo elements in queue. */
566{
567  unsigned long result = 0;
568
569  if (queuehead != NULL)
570  {
571    if (pthread_mutex_lock(&queuehead->mutex)!=0)
572    {
573      qerr(QERR_MUTEXLOCK);
574      return 0;
575    }
576    /* begin critical section */
577
578    result = queuehead->nr_of_elements; 
579
580    /* end critical section */
581    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
582  }
583  else
584    qerr(QERR_QINVALID);
585
586  return result;
587}
588
589int queue_is_expedited_enabled(queue_t *queuehead)
590/** Get exp_enabled flag. */
591{
592  int result = 0;
593
594  if (queuehead != NULL)
595  {
596    if (pthread_mutex_lock(&queuehead->mutex)!=0)
597    {
598      qerr(QERR_MUTEXLOCK);
599      return 0;
600    }
601    /* begin critical section */
602
603    result = queuehead->exp_enabled; 
604
605    /* end critical section */
606    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
607  }
608  else
609    qerr(QERR_QINVALID);
610
611  return result;
612}
613
614int queue_enable_expedited(queue_t *queuehead, int exp)
615/** Set exp_enabled flag and return old value. */
616{
617  int result = 0;
618
619  if (queuehead != NULL)
620  {
621    if (pthread_mutex_lock(&queuehead->mutex)!=0)
622    {
623      qerr(QERR_MUTEXLOCK);
624      return 0;
625    }
626    /* begin critical section */
627
628    result = queuehead->exp_enabled; 
629    if (exp) queuehead->exp_enabled = 1;
630    else queuehead->exp_enabled = 0;
631
632    /* end critical section */
633    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
634  }
635  else
636    qerr(QERR_QINVALID);
637
638  return result;
639}
640
641//@}
Note: See TracBrowser for help on using the repository browser.