00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "fqueue.h"
00031 #include "stdio.h"
00032 #include "logfile.h"
00033
00034 #include <string>
00035
00036 namespace protlib {
00037
00042 using namespace log;
00043
00053 FastQueue::FastQueue(const char *qname, bool exp)
00054 : queue_name((qname == 0) ? "" : (char*)qname), shutdownflag(false)
00055 {
00056 if ((queue = create_queue(qname)) == NULL)
00057 {
00058 Log(ERROR_LOG, LOG_ALERT, "FastQueue", "Could not create queue " << queue_name);
00059 throw FQError();
00060 } else queue_enable_expedited(queue,exp);
00061 }
00062
00063
00084 bool FastQueue::enqueue(message *element, bool exp)
00085 {
00086 if (shutdownflag) return false;
00087 if (enqueue_element_expedited_signal(queue, (void*)element, exp) < 0)
00088 {
00089 Log(ERROR_LOG, LOG_ALERT, "FastQueue", "Could not enqueue element in queue " << queue_name);
00090 return false;
00091 }
00092 return true;
00093 }
00094
00095
00105 message *FastQueue::dequeue_timedwait(const long int msec)
00106 {
00107 struct timespec tspec = {0,0};
00108 tspec.tv_sec = msec/1000;
00109 tspec.tv_nsec = (msec%1000)*1000000;
00110 return (message*)dequeue_element_timedwait(queue, &tspec);
00111 }
00112
00113
00120 FastQueue::~FastQueue()
00121 {
00122 if (queue)
00123 {
00124 cleanup();
00125 if ((destroy_queue(queue)) < 0)
00126 {
00127 Log(ERROR_LOG, LOG_ALERT, "FastQueue", "Could not destroy queue " << queue_name);
00128 }
00129 }
00130 DLog("FastQueue", "~FastQueue() - done for queue " << queue_name);
00131 }
00132
00138 bool FastQueue::is_empty() const
00139 {
00140 if (queue_nr_of_elements(queue)==0)
00141 return true;
00142 else
00143 return false;
00144 }
00145
00146
00152 unsigned long FastQueue::size() const
00153 {
00154 return queue_nr_of_elements(queue);
00155 }
00156
00157
00163 bool FastQueue::is_expedited_enabled() const
00164 {
00165 if (queue_is_expedited_enabled(queue))
00166 return true;
00167 else
00168 return false;
00169 }
00170
00178 bool FastQueue::enable_expedited(bool exp)
00179 {
00180 if (queue_enable_expedited(queue,exp))
00181 return true;
00182 else
00183 return false;
00184 }
00185
00186
00192 void FastQueue::shutdown() { shutdownflag = true; }
00193
00194
00200 unsigned long FastQueue::cleanup()
00201 {
00202 unsigned long count = 0;
00203 message* m = NULL;
00204 shutdownflag = true;
00205 while (!is_empty())
00206 if ((m = dequeue(false))) {
00207 delete m;
00208 m = NULL;
00209 count++;
00210 }
00211 return count;
00212 }
00213
00215
00216 }