An Overlay-based
Virtual Network Substrate
SpoVNet

source: trash/old-modules/transport/protlib/fastqueue.c @ 5641

Last change on this file since 5641 was 5641, checked in by Christoph Mayer, 14 years ago
File size: 18.0 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file fastqueue.c
3/// a simple FIFO queue with mutexes for use with pthreads
4/// ----------------------------------------------------------
5/// $Id: fastqueue.c 2549 2007-04-02 22:17:37Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/fastqueue/fastqueue.c $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30/** @addtogroup fastqueue Fast Queue
31 * @{
32 */
33
34/** @file
35 * Fast and thread-safe queue.
36 */
37
38/******************************************************************************
39 * fastqueue.c -- a simple FIFO queue with mutexes for use with pthreads      *
40 * -------------------------------------------------------------------------- *
41 * written by Roland Bless 1995                                               *
42 * all operations enqueue,dequeue are done in O(1) which means constant time  *
43 ******************************************************************************/
44
45#define _GNU_SOURCE
46#include <stdio.h>
47#include <string.h>
48#include <pthread.h>
49#include <stdlib.h>
50#include <errno.h>
51#include <sys/time.h>
52#include <unistd.h>
53
54                       /**** module interface ****/
55#include "fastqueue.h"
56
57                  /*************** defines *****************/
58#define qerr(errnr)      fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
59
60#ifdef __linux__
61
62// not needed for Linux
63//#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
64
65#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
66#define PRI_OTHER_MIN  PRI_FG_MIN_NP
67#define PRI_OTHER_MAX  PRI_FG_MAX_NP
68#define PRI_FG_MIN_NP  8
69#define PRI_FG_MAX_NP  15
70
71#define CLOCK_REALTIME 0
72#define NSEC_PER_SEC   1000000000
73extern int eclock_gettime(struct timespec *tp);
74#define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
75
76#elif _DECTHREADS_VERSION < 314126L
77
78#define pthread_mutexattr_settype pthread_mutexattr_settype_np
79#define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_NORMAL_NP
80#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
81#endif
82                 /*************** typedefs *****************/
83
84enum {
85       QERR_NONE,
86       QERR_NOMEM,
87       QERR_MUTEXINIT,
88       QERR_MUTEXLOCK,
89       QERR_MUTEXUNLOCK,
90       QERR_MUTEXDESTROY,
91       QERR_QEMPTY,
92       QERR_QINVALID,
93       QERR_QNOTEMPTY,
94       QERR_CONDINIT,
95       QERR_CONDWAIT,
96       QERR_CONDSIGNAL,
97       QERR_CONDDESTROY
98};
99
100const 
101char *const queue_errmsg[]=
102{
103  "all ok",
104  "can't get enough memory",
105  "initializing mutex",
106  "locking mutex",
107  "unlocking mutex",
108  "destroying mutex",
109  "queue empty",
110  "invalid queueobject",
111  "destroying queue - queue not empty",
112  "initializing queue condition variable",
113  "waiting on condition",
114  "signalling condition",
115  "destroying condition"
116};
117
118
119
120queue_t *create_queue(const char* name)
121/* initialization routine for a queue.
122 * returns: NULL if an error occured, or a queue-object which is actually
123 *          a queueheader structure
124 * arguments: none
125 */
126{
127  queue_t *queuehead;
128
129  /* Allocate memory for queue head */
130  if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
131  {
132    /* Set mutex kind */
133    pthread_mutexattr_init(&queuehead->mutex_attr);
134#ifdef _DEBUG
135    pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
136#else
137    pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
138#endif
139    /* use PTHREAD_MUTEX_ERRORCHECK or PTHREAD_MUTEX_ERRORCHECK_NP for testing */
140
141    /* Initialize mutex */
142    if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
143    {
144      /* Create Condition variable for command queue */
145      if (pthread_cond_init(&queuehead->cond, NULL)==0)
146      {
147        queuehead->nr_of_elements= 0UL;
148        queuehead->exp_nr_of_elements= 0UL;
149        queuehead->exp_enabled = 0;
150        queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
151        queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
152        if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
153           qerr(QERR_NOMEM);
154        else
155        {
156          queuehead->first_block->read=  0;
157          queuehead->first_block->write= 0;
158          queuehead->first_block->next_block= NULL;         
159          queuehead->last_block= queuehead->first_block;
160          queuehead->exp_first_block->read=  0;
161          queuehead->exp_first_block->write= 0;
162          queuehead->exp_first_block->next_block= NULL;         
163          queuehead->exp_last_block= queuehead->exp_first_block;
164          if (name)
165          {
166            if (strlen(name) <= MAX_QUEUENAME_LENGTH)
167              strcpy(queuehead->name, name);
168            else
169            {
170              memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
171              queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
172            }
173          }
174          else
175            queuehead->name[0] = '\0';
176          /* Now it's simple to enqueue elements, esp. the first one */
177        }
178//#ifdef QUEUELEN
179        queuehead->queue_maxlength= 0;
180//#endif
181      }
182      else
183        qerr(QERR_CONDINIT);
184    }
185    else /* error during initialize */
186      qerr(QERR_MUTEXINIT);
187  }
188  else
189    qerr(QERR_NOMEM);
190
191  return queuehead;
192}
193
194int enqueue_element_signal(queue_t *queuehead, void *element)
195{
196  return enqueue_element_expedited_signal(queuehead,element,0);
197}
198
199int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
200/* add a new element into the queue. Memory for the element must be
201 * allocated anywhere else. This routine signals other waiting threads.
202 * returns: -1 if an error occured, or 0 is action could be performed
203 * arguments: pointer to queue_t object, pointer to an element
204 */
205{
206  queue_elblock_t *newelement, *lastblockp;
207
208  if (queuehead==NULL)
209  {
210    qerr(QERR_QINVALID);
211    return -1;
212  }
213
214  if (pthread_mutex_lock(&queuehead->mutex)!=0)
215  {
216    qerr(QERR_MUTEXLOCK);
217    return -1;
218  }
219  /* begin critical section */
220
221  if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
222  /* Allocate new element structure when necessary */
223  /* Note: queuehead->last_block must always contain a valid value */
224  lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
225  if (lastblockp->write == ELEMENT_BLOCKSIZE)
226  { /* last block is full, so allocate a new block */
227    if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
228    {
229      qerr(QERR_NOMEM);
230      return -1;
231    }
232
233    /* initialize new structure */
234    newelement->element[0]= element;
235    newelement->read      = 0;
236    newelement->write     = 1;
237    newelement->next_block= NULL;
238   
239    /* append new element to the end */
240    lastblockp->next_block= newelement;
241    /* new element becomes last element */
242    if (exp) queuehead->exp_last_block = newelement; 
243    else queuehead->last_block = newelement; 
244  }
245  else /* last block was not full */
246  { 
247    lastblockp->element[lastblockp->write]= element;
248    lastblockp->write++;
249  }
250 
251  if (exp) queuehead->exp_nr_of_elements++;
252  queuehead->nr_of_elements++;
253//#ifdef QUEUELEN
254    if (queuehead->nr_of_elements > queuehead->queue_maxlength)
255       queuehead->queue_maxlength= queuehead->nr_of_elements;
256//#endif
257  /* Condition should be set while mutex is locked.
258     Recommended by libc manual.
259  */
260  if (pthread_cond_signal(&queuehead->cond)!=0)
261     qerr(QERR_CONDSIGNAL);
262  /* end critical section */
263  if (pthread_mutex_unlock(&queuehead->mutex)!=0)
264  {
265    qerr(QERR_MUTEXUNLOCK);
266    return -1;
267  }
268  // see above
269//  if (pthread_cond_signal(&queuehead->cond)!=0)
270//     qerr(QERR_CONDSIGNAL);
271
272  return 0;
273}
274
275
276void *dequeue_element_wait(queue_t *queuehead)
277/* wait for the queue to contain an element.
278 * if it contains an element return and remove it.
279 * returns: NULL if an error occured, the pointer to the element otherwise
280 * arguments: pointer to queue_t object
281 */
282{
283  void       *element;
284  queue_elblock_t *blockp;
285  int exp = 0;
286  element= NULL;
287  int retcode= 0;
288
289  if (queuehead != NULL)
290  {
291    /* Wait for an element in the queue */
292    /* Before waiting on a condition, the associated mutex must be locked */
293    if (pthread_mutex_lock(&queuehead->mutex)!=0)
294    {
295      qerr(QERR_MUTEXLOCK); return NULL;
296    }
297
298    while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
299    { /* for a safe state the predicate must be checked in a loop! */
300      /* cond_wait() unlocks the mutex and might return sometimes without
301         getting a signal! */
302      if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
303      {
304         if (retcode!=EINTR && retcode!=ETIMEDOUT)
305         {
306           qerr(QERR_CONDWAIT);
307         }
308      }
309    }
310
311    /* begin critical section */
312    exp = (queuehead->exp_nr_of_elements!=0);
313    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
314    if (blockp != NULL)
315    {
316      /* get the first element */
317      element= blockp->element[blockp->read];
318      blockp->read++;
319
320      if (blockp->next_block == NULL) /* this is the last block */
321      {
322        if (blockp->read == blockp->write) 
323        { /* block is completely dequeued, so reset values */
324          /* the last block always remains allocated! */
325          blockp->read=  0;
326          blockp->write= 0;
327        }
328      }
329      else /* this is not the last block */
330      {
331        /* if block was completely dequeued, remove it */
332        if (blockp->read == ELEMENT_BLOCKSIZE)
333        {
334          if (exp) queuehead->exp_first_block= blockp->next_block;
335          else queuehead->first_block= blockp->next_block;
336          free(blockp);
337        }
338      }
339      if (exp) queuehead->exp_nr_of_elements--;
340      queuehead->nr_of_elements--;
341    }
342    else
343      qerr(QERR_QEMPTY);
344
345    /* end critical section */
346    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
347    {
348      qerr(QERR_MUTEXUNLOCK);
349      return NULL;
350    }
351  }
352  else
353    qerr(QERR_QINVALID);
354
355  return element;
356}
357
358void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
359/* wait for the queue to contain an element.
360 * if it contains an element return and remove it.
361 * returns: NULL if an error occured, the pointer to the element otherwise
362 * arguments: pointer to queue_t object
363 *            tpsec is the time interval to wait (not an absolute time!)
364 */
365{
366  void       *element;
367  queue_elblock_t *blockp;
368  int result;
369  struct timespec abs_tspec;
370  int exp = 0;
371  element= NULL;
372
373  if (queuehead != NULL)
374  {
375    /* Wait for an element in the queue */
376    /* Before waiting on a condition, the associated mutex must be locked */
377    if (pthread_mutex_lock(&queuehead->mutex)!=0)
378    {
379      qerr(QERR_MUTEXLOCK); return NULL;
380    }
381
382    while(queuehead->nr_of_elements==0) /* while there is no work to do, wait */
383    { /* for a safe state the predicate must be checked in a loop! */
384      /* cond_wait() unlocks the mutex and might return sometimes without
385         getting a signal! */
386      clock_gettime(CLOCK_REALTIME, &abs_tspec);
387      abs_tspec.tv_nsec+= tspec->tv_nsec;
388      abs_tspec.tv_sec+= tspec->tv_sec;
389      if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
390      {
391        abs_tspec.tv_nsec%= NSEC_PER_SEC;
392        abs_tspec.tv_sec++;
393      };
394
395      if ((result = pthread_cond_timedwait(&queuehead->cond,
396                                           &queuehead->mutex, &abs_tspec))!=0)
397      {
398        if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) ) 
399        {
400          qerr(QERR_CONDWAIT);
401        } 
402        else 
403        { /* timeout */
404          if (pthread_mutex_unlock(&queuehead->mutex)!=0) 
405          {
406            qerr(QERR_MUTEXUNLOCK);
407            return NULL;
408          } 
409          return NULL;
410        }
411      }
412    }
413
414    /* begin critical section */
415    exp = (queuehead->exp_nr_of_elements!=0);
416    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
417    if (blockp != NULL)
418    {
419      /* get the first element */
420      element= blockp->element[blockp->read];
421      blockp->read++;
422
423      if (blockp->next_block == NULL) /* this is the last block */
424      {
425        if (blockp->read == blockp->write) 
426        { /* block is completely dequeued, so reset values */
427          /* the last block always remains allocated! */
428          blockp->read=  0;
429          blockp->write= 0;
430        }
431      }
432      else /* this is not the last block */
433      {
434        /* if block was completely dequeued, remove it */
435        if (blockp->read == ELEMENT_BLOCKSIZE)
436        {
437          if (exp) queuehead->exp_first_block= blockp->next_block;
438          else queuehead->first_block= blockp->next_block;
439          free(blockp);
440        }
441      }
442      if (exp) queuehead->exp_nr_of_elements--;
443      queuehead->nr_of_elements--;
444    }
445    else
446      qerr(QERR_QEMPTY);
447
448    /* end critical section */
449    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
450    {
451      qerr(QERR_MUTEXUNLOCK);
452      return NULL;
453    }
454  }
455  else
456    qerr(QERR_QINVALID);
457
458  return element;
459}
460
461int destroy_queue(queue_t *queuehead)
462/* destroys the queue and frees all resources, except the elements!
463 * the queue must be empty to destroy it.
464 * returns: -1 if an error occured, 0 otherwise
465 * arguments: pointer to queue_t object
466 */
467{
468  if (queuehead!=NULL)
469  {
470    /* queue not empty? */
471    if (queuehead->nr_of_elements != 0)
472      qerr(QERR_QNOTEMPTY);
473    else
474    {
475      /* destroy condition variable */
476      if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
477      /* destroy mutex */
478      if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
479     
480      pthread_mutexattr_destroy(&queuehead->mutex_attr);
481
482      /* free memory for queuehead */
483#ifdef QUEUELEN
484      fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
485              queuehead->name, queuehead->queue_maxlength);
486#endif     
487      free(queuehead->exp_last_block);
488      free(queuehead->last_block);
489      free(queuehead);
490    }
491
492    return 0;
493  }
494  else
495    qerr(QERR_QINVALID);
496
497  return -1;
498}
499
500void *dequeue_element_nonblocking(queue_t *queuehead)
501/* if queue contains an element return and remove it.
502 * returns: NULL if an error occured or queue was empty, the pointer to
503 * the element otherwise.
504 * arguments: pointer to queue_t object
505 */
506{
507  void       *element;
508  queue_elblock_t *blockp;
509  int exp = 0;
510  element= NULL;
511
512  if (queuehead != NULL)
513  {
514    if (pthread_mutex_lock(&queuehead->mutex)!=0)
515    {
516      qerr(QERR_MUTEXLOCK); return NULL;
517    }
518
519    /* begin critical section */
520
521    if (queuehead->nr_of_elements==0) 
522    { 
523      if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
524      return NULL;
525    }
526
527    exp = (queuehead->exp_nr_of_elements!=0);
528    blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
529    if (blockp != NULL)
530    {
531      /* get the first element */
532      element= blockp->element[blockp->read];
533      blockp->read++;
534
535      if (blockp->next_block == NULL) /* this is the last block */
536      {
537        if (blockp->read == blockp->write) 
538        { /* block is completely dequeued, so reset values */
539          /* the last block always remains allocated! */
540          blockp->read=  0;
541          blockp->write= 0;
542        }
543      }
544      else /* this is not the last block */
545      {
546        /* if block was completely dequeued, remove it */
547        if (blockp->read == ELEMENT_BLOCKSIZE)
548        {
549          if (exp) queuehead->exp_first_block= blockp->next_block;
550          else queuehead->first_block= blockp->next_block;
551          free(blockp);
552        }
553      }
554      if (exp) queuehead->exp_nr_of_elements--;
555      queuehead->nr_of_elements--;
556    }
557    else
558      qerr(QERR_QEMPTY);
559
560    /* end critical section */
561    if (pthread_mutex_unlock(&queuehead->mutex)!=0)
562    {
563      qerr(QERR_MUTEXUNLOCK);
564      return NULL;
565    }
566  }
567  else
568    qerr(QERR_QINVALID);
569
570  return element;
571}
572
573unsigned long queue_nr_of_elements(queue_t *queuehead)
574/** Get number fo elements in queue. */
575{
576  unsigned long result = 0;
577
578  if (queuehead != NULL)
579  {
580    if (pthread_mutex_lock(&queuehead->mutex)!=0)
581    {
582      qerr(QERR_MUTEXLOCK);
583      return 0;
584    }
585    /* begin critical section */
586
587    result = queuehead->nr_of_elements; 
588
589    /* end critical section */
590    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
591  }
592  else
593    qerr(QERR_QINVALID);
594
595  return result;
596}
597
598int queue_is_expedited_enabled(queue_t *queuehead)
599/** Get exp_enabled flag. */
600{
601  int result = 0;
602
603  if (queuehead != NULL)
604  {
605    if (pthread_mutex_lock(&queuehead->mutex)!=0)
606    {
607      qerr(QERR_MUTEXLOCK);
608      return 0;
609    }
610    /* begin critical section */
611
612    result = queuehead->exp_enabled; 
613
614    /* end critical section */
615    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
616  }
617  else
618    qerr(QERR_QINVALID);
619
620  return result;
621}
622
623int queue_enable_expedited(queue_t *queuehead, int exp)
624/** Set exp_enabled flag and return old value. */
625{
626  int result = 0;
627
628  if (queuehead != NULL)
629  {
630    if (pthread_mutex_lock(&queuehead->mutex)!=0)
631    {
632      qerr(QERR_MUTEXLOCK);
633      return 0;
634    }
635    /* begin critical section */
636
637    result = queuehead->exp_enabled; 
638    if (exp) queuehead->exp_enabled = 1;
639    else queuehead->exp_enabled = 0;
640
641    /* end critical section */
642    if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
643  }
644  else
645    qerr(QERR_QINVALID);
646
647  return result;
648}
649
650//@}
Note: See TracBrowser for help on using the repository browser.