00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00038
00039
00040
00041
00042
00043
00044
00045 #define _GNU_SOURCE
00046 #include <stdio.h>
00047 #include <string.h>
00048 #include <pthread.h>
00049 #include <stdlib.h>
00050 #include <errno.h>
00051 #include <sys/time.h>
00052 #include <unistd.h>
00053
00054
00055 #include "fastqueue.h"
00056
00057
00058 #define qerr(errnr) fprintf(stderr,"queue.c: %s\n",queue_errmsg[errnr])
00059
00060 #ifdef __linux__
00061
00062
00063
00064
00065 #define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_TIMED_NP
00066 #define PRI_OTHER_MIN PRI_FG_MIN_NP
00067 #define PRI_OTHER_MAX PRI_FG_MAX_NP
00068 #define PRI_FG_MIN_NP 8
00069 #define PRI_FG_MAX_NP 15
00070
00071 #define CLOCK_REALTIME 0
00072 #define NSEC_PER_SEC 1000000000
00073 extern int eclock_gettime(struct timespec *tp);
00074 #define clock_gettime(clock_id, tspec) eclock_gettime(tspec)
00075
00076 #elif _DECTHREADS_VERSION < 314126L
00077
00078 #define pthread_mutexattr_settype pthread_mutexattr_settype_np
00079 #define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_NORMAL_NP
00080 #define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
00081 #endif
00082
00083
00084 enum {
00085 QERR_NONE,
00086 QERR_NOMEM,
00087 QERR_MUTEXINIT,
00088 QERR_MUTEXLOCK,
00089 QERR_MUTEXUNLOCK,
00090 QERR_MUTEXDESTROY,
00091 QERR_QEMPTY,
00092 QERR_QINVALID,
00093 QERR_QNOTEMPTY,
00094 QERR_CONDINIT,
00095 QERR_CONDWAIT,
00096 QERR_CONDSIGNAL,
00097 QERR_CONDDESTROY
00098 };
00099
00100 const
00101 char *const queue_errmsg[]=
00102 {
00103 "all ok",
00104 "can't get enough memory",
00105 "initializing mutex",
00106 "locking mutex",
00107 "unlocking mutex",
00108 "destroying mutex",
00109 "queue empty",
00110 "invalid queueobject",
00111 "destroying queue - queue not empty",
00112 "initializing queue condition variable",
00113 "waiting on condition",
00114 "signalling condition",
00115 "destroying condition"
00116 };
00117
00118
00119
00120 queue_t *create_queue(const char* name)
00121
00122
00123
00124
00125
00126 {
00127 queue_t *queuehead;
00128
00129
00130 if ((queuehead= (queue_t *) malloc(sizeof(queue_t)))!=NULL)
00131 {
00132
00133 pthread_mutexattr_init(&queuehead->mutex_attr);
00134 #ifdef _DEBUG
00135 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_ERRORCHECK);
00136 #else
00137 pthread_mutexattr_settype(&queuehead->mutex_attr,PTHREAD_MUTEX_NORMAL);
00138 #endif
00139
00140
00141
00142 if (pthread_mutex_init(&queuehead->mutex, &queuehead->mutex_attr)==0)
00143 {
00144
00145 if (pthread_cond_init(&queuehead->cond, NULL)==0)
00146 {
00147 queuehead->nr_of_elements= 0UL;
00148 queuehead->exp_nr_of_elements= 0UL;
00149 queuehead->exp_enabled = 0;
00150 queuehead->first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
00151 queuehead->exp_first_block= (queue_elblock_t *) malloc(sizeof(queue_elblock_t));
00152 if ((queuehead->first_block == NULL) || (queuehead->exp_first_block == NULL))
00153 qerr(QERR_NOMEM);
00154 else
00155 {
00156 queuehead->first_block->read= 0;
00157 queuehead->first_block->write= 0;
00158 queuehead->first_block->next_block= NULL;
00159 queuehead->last_block= queuehead->first_block;
00160 queuehead->exp_first_block->read= 0;
00161 queuehead->exp_first_block->write= 0;
00162 queuehead->exp_first_block->next_block= NULL;
00163 queuehead->exp_last_block= queuehead->exp_first_block;
00164 if (name)
00165 {
00166 if (strlen(name) <= MAX_QUEUENAME_LENGTH)
00167 strcpy(queuehead->name, name);
00168 else
00169 {
00170 memcpy(queuehead->name, name, MAX_QUEUENAME_LENGTH);
00171 queuehead->name[MAX_QUEUENAME_LENGTH + 1] = '\0';
00172 }
00173 }
00174 else
00175 queuehead->name[0] = '\0';
00176
00177 }
00178
00179 queuehead->queue_maxlength= 0;
00180
00181 }
00182 else
00183 qerr(QERR_CONDINIT);
00184 }
00185 else
00186 qerr(QERR_MUTEXINIT);
00187 }
00188 else
00189 qerr(QERR_NOMEM);
00190
00191 return queuehead;
00192 }
00193
00194 int enqueue_element_signal(queue_t *queuehead, void *element)
00195 {
00196 return enqueue_element_expedited_signal(queuehead,element,0);
00197 }
00198
00199 int enqueue_element_expedited_signal(queue_t *queuehead, void *element, int exp)
00200
00201
00202
00203
00204
00205 {
00206 queue_elblock_t *newelement, *lastblockp;
00207
00208 if (queuehead==NULL)
00209 {
00210 qerr(QERR_QINVALID);
00211 return -1;
00212 }
00213
00214 if (pthread_mutex_lock(&queuehead->mutex)!=0)
00215 {
00216 qerr(QERR_MUTEXLOCK);
00217 return -1;
00218 }
00219
00220
00221 if (exp && queuehead->exp_enabled) exp = 1; else exp = 0;
00222
00223
00224 lastblockp = (exp ? (queuehead->exp_last_block) : (queuehead->last_block));
00225 if (lastblockp->write == ELEMENT_BLOCKSIZE)
00226 {
00227 if ((newelement= (queue_elblock_t *) malloc(sizeof(queue_elblock_t)))==NULL)
00228 {
00229 qerr(QERR_NOMEM);
00230 return -1;
00231 }
00232
00233
00234 newelement->element[0]= element;
00235 newelement->read = 0;
00236 newelement->write = 1;
00237 newelement->next_block= NULL;
00238
00239
00240 lastblockp->next_block= newelement;
00241
00242 if (exp) queuehead->exp_last_block = newelement;
00243 else queuehead->last_block = newelement;
00244 }
00245 else
00246 {
00247 lastblockp->element[lastblockp->write]= element;
00248 lastblockp->write++;
00249 }
00250
00251 if (exp) queuehead->exp_nr_of_elements++;
00252 queuehead->nr_of_elements++;
00253
00254 if (queuehead->nr_of_elements > queuehead->queue_maxlength)
00255 queuehead->queue_maxlength= queuehead->nr_of_elements;
00256
00257
00258
00259
00260 if (pthread_cond_signal(&queuehead->cond)!=0)
00261 qerr(QERR_CONDSIGNAL);
00262
00263 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
00264 {
00265 qerr(QERR_MUTEXUNLOCK);
00266 return -1;
00267 }
00268
00269
00270
00271
00272 return 0;
00273 }
00274
00275
00276 void *dequeue_element_wait(queue_t *queuehead)
00277
00278
00279
00280
00281
00282 {
00283 void *element;
00284 queue_elblock_t *blockp;
00285 int exp = 0;
00286 element= NULL;
00287 int retcode= 0;
00288
00289 if (queuehead != NULL)
00290 {
00291
00292
00293 if (pthread_mutex_lock(&queuehead->mutex)!=0)
00294 {
00295 qerr(QERR_MUTEXLOCK); return NULL;
00296 }
00297
00298 while(queuehead->nr_of_elements==0)
00299 {
00300
00301
00302 if ((retcode= pthread_cond_wait(&queuehead->cond, &queuehead->mutex)) != 0)
00303 {
00304 if (retcode!=EINTR && retcode!=ETIMEDOUT)
00305 {
00306 qerr(QERR_CONDWAIT);
00307 }
00308 }
00309 }
00310
00311
00312 exp = (queuehead->exp_nr_of_elements!=0);
00313 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
00314 if (blockp != NULL)
00315 {
00316
00317 element= blockp->element[blockp->read];
00318 blockp->read++;
00319
00320 if (blockp->next_block == NULL)
00321 {
00322 if (blockp->read == blockp->write)
00323 {
00324
00325 blockp->read= 0;
00326 blockp->write= 0;
00327 }
00328 }
00329 else
00330 {
00331
00332 if (blockp->read == ELEMENT_BLOCKSIZE)
00333 {
00334 if (exp) queuehead->exp_first_block= blockp->next_block;
00335 else queuehead->first_block= blockp->next_block;
00336 free(blockp);
00337 }
00338 }
00339 if (exp) queuehead->exp_nr_of_elements--;
00340 queuehead->nr_of_elements--;
00341 }
00342 else
00343 qerr(QERR_QEMPTY);
00344
00345
00346 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
00347 {
00348 qerr(QERR_MUTEXUNLOCK);
00349 return NULL;
00350 }
00351 }
00352 else
00353 qerr(QERR_QINVALID);
00354
00355 return element;
00356 }
00357
00358 void *dequeue_element_timedwait(queue_t *queuehead, const struct timespec *tspec)
00359
00360
00361
00362
00363
00364
00365 {
00366 void *element;
00367 queue_elblock_t *blockp;
00368 int result;
00369 struct timespec abs_tspec;
00370 int exp = 0;
00371 element= NULL;
00372
00373 if (queuehead != NULL)
00374 {
00375
00376
00377 if (pthread_mutex_lock(&queuehead->mutex)!=0)
00378 {
00379 qerr(QERR_MUTEXLOCK); return NULL;
00380 }
00381
00382 while(queuehead->nr_of_elements==0)
00383 {
00384
00385
00386 clock_gettime(CLOCK_REALTIME, &abs_tspec);
00387 abs_tspec.tv_nsec+= tspec->tv_nsec;
00388 abs_tspec.tv_sec+= tspec->tv_sec;
00389 if (abs_tspec.tv_nsec >= NSEC_PER_SEC)
00390 {
00391 abs_tspec.tv_nsec%= NSEC_PER_SEC;
00392 abs_tspec.tv_sec++;
00393 };
00394
00395 if ((result = pthread_cond_timedwait(&queuehead->cond,
00396 &queuehead->mutex, &abs_tspec))!=0)
00397 {
00398 if ( (result != ETIMEDOUT) && (result != EINTR) && (result != EINVAL) )
00399 {
00400 qerr(QERR_CONDWAIT);
00401 }
00402 else
00403 {
00404 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
00405 {
00406 qerr(QERR_MUTEXUNLOCK);
00407 return NULL;
00408 }
00409 return NULL;
00410 }
00411 }
00412 }
00413
00414
00415 exp = (queuehead->exp_nr_of_elements!=0);
00416 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
00417 if (blockp != NULL)
00418 {
00419
00420 element= blockp->element[blockp->read];
00421 blockp->read++;
00422
00423 if (blockp->next_block == NULL)
00424 {
00425 if (blockp->read == blockp->write)
00426 {
00427
00428 blockp->read= 0;
00429 blockp->write= 0;
00430 }
00431 }
00432 else
00433 {
00434
00435 if (blockp->read == ELEMENT_BLOCKSIZE)
00436 {
00437 if (exp) queuehead->exp_first_block= blockp->next_block;
00438 else queuehead->first_block= blockp->next_block;
00439 free(blockp);
00440 }
00441 }
00442 if (exp) queuehead->exp_nr_of_elements--;
00443 queuehead->nr_of_elements--;
00444 }
00445 else
00446 qerr(QERR_QEMPTY);
00447
00448
00449 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
00450 {
00451 qerr(QERR_MUTEXUNLOCK);
00452 return NULL;
00453 }
00454 }
00455 else
00456 qerr(QERR_QINVALID);
00457
00458 return element;
00459 }
00460
00461 int destroy_queue(queue_t *queuehead)
00462
00463
00464
00465
00466
00467 {
00468 if (queuehead!=NULL)
00469 {
00470
00471 if (queuehead->nr_of_elements != 0)
00472 qerr(QERR_QNOTEMPTY);
00473 else
00474 {
00475
00476 if (pthread_cond_destroy(&queuehead->cond)!=0) qerr(QERR_CONDDESTROY);
00477
00478 if (pthread_mutex_destroy(&queuehead->mutex)!=0) qerr(QERR_MUTEXDESTROY);
00479
00480 pthread_mutexattr_destroy(&queuehead->mutex_attr);
00481
00482
00483 #ifdef QUEUELEN
00484 fprintf(stderr,"queue.c: length of queue (%s) growed up to %lu elements\n",
00485 queuehead->name, queuehead->queue_maxlength);
00486 #endif
00487 free(queuehead->exp_last_block);
00488 free(queuehead->last_block);
00489 free(queuehead);
00490 }
00491
00492 return 0;
00493 }
00494 else
00495 qerr(QERR_QINVALID);
00496
00497 return -1;
00498 }
00499
00500 void *dequeue_element_nonblocking(queue_t *queuehead)
00501
00502
00503
00504
00505
00506 {
00507 void *element;
00508 queue_elblock_t *blockp;
00509 int exp = 0;
00510 element= NULL;
00511
00512 if (queuehead != NULL)
00513 {
00514 if (pthread_mutex_lock(&queuehead->mutex)!=0)
00515 {
00516 qerr(QERR_MUTEXLOCK); return NULL;
00517 }
00518
00519
00520
00521 if (queuehead->nr_of_elements==0)
00522 {
00523 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
00524 return NULL;
00525 }
00526
00527 exp = (queuehead->exp_nr_of_elements!=0);
00528 blockp = (exp ? (queuehead->exp_first_block) : (queuehead->first_block));
00529 if (blockp != NULL)
00530 {
00531
00532 element= blockp->element[blockp->read];
00533 blockp->read++;
00534
00535 if (blockp->next_block == NULL)
00536 {
00537 if (blockp->read == blockp->write)
00538 {
00539
00540 blockp->read= 0;
00541 blockp->write= 0;
00542 }
00543 }
00544 else
00545 {
00546
00547 if (blockp->read == ELEMENT_BLOCKSIZE)
00548 {
00549 if (exp) queuehead->exp_first_block= blockp->next_block;
00550 else queuehead->first_block= blockp->next_block;
00551 free(blockp);
00552 }
00553 }
00554 if (exp) queuehead->exp_nr_of_elements--;
00555 queuehead->nr_of_elements--;
00556 }
00557 else
00558 qerr(QERR_QEMPTY);
00559
00560
00561 if (pthread_mutex_unlock(&queuehead->mutex)!=0)
00562 {
00563 qerr(QERR_MUTEXUNLOCK);
00564 return NULL;
00565 }
00566 }
00567 else
00568 qerr(QERR_QINVALID);
00569
00570 return element;
00571 }
00572
00573 unsigned long queue_nr_of_elements(queue_t *queuehead)
00575 {
00576 unsigned long result = 0;
00577
00578 if (queuehead != NULL)
00579 {
00580 if (pthread_mutex_lock(&queuehead->mutex)!=0)
00581 {
00582 qerr(QERR_MUTEXLOCK);
00583 return 0;
00584 }
00585
00586
00587 result = queuehead->nr_of_elements;
00588
00589
00590 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
00591 }
00592 else
00593 qerr(QERR_QINVALID);
00594
00595 return result;
00596 }
00597
00598 int queue_is_expedited_enabled(queue_t *queuehead)
00600 {
00601 int result = 0;
00602
00603 if (queuehead != NULL)
00604 {
00605 if (pthread_mutex_lock(&queuehead->mutex)!=0)
00606 {
00607 qerr(QERR_MUTEXLOCK);
00608 return 0;
00609 }
00610
00611
00612 result = queuehead->exp_enabled;
00613
00614
00615 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
00616 }
00617 else
00618 qerr(QERR_QINVALID);
00619
00620 return result;
00621 }
00622
00623 int queue_enable_expedited(queue_t *queuehead, int exp)
00625 {
00626 int result = 0;
00627
00628 if (queuehead != NULL)
00629 {
00630 if (pthread_mutex_lock(&queuehead->mutex)!=0)
00631 {
00632 qerr(QERR_MUTEXLOCK);
00633 return 0;
00634 }
00635
00636
00637 result = queuehead->exp_enabled;
00638 if (exp) queuehead->exp_enabled = 1;
00639 else queuehead->exp_enabled = 0;
00640
00641
00642 if (pthread_mutex_unlock(&queuehead->mutex)!=0) qerr(QERR_MUTEXUNLOCK);
00643 }
00644 else
00645 qerr(QERR_QINVALID);
00646
00647 return result;
00648 }
00649