00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00032 #include <stdexcept>
00033
00034 #include "queuemanager.h"
00035 #include "logfile.h"
00036
00037 namespace protlib {
00038
00044 using namespace log;
00045
00046
00047
00048 QueueManagerError::QueueManagerError(error_t e) : err(e) {}
00049
00050 const char* QueueManagerError::getstr() const { return errstr[err]; }
00051
00052 const char* const QueueManagerError::errstr[] = {
00053 "Unable to create QueueManager.",
00054 "Cannot register FastQueue. No memory or registered queue more than once."
00055 };
00056
00057
00058
00060 QueueManager* QueueManager::instance() {
00061 if (!inst) {
00062
00063 inst = new(nothrow) QueueManager();
00064 if (!inst) {
00065 Log(INFO_LOG,LOG_NORMAL, "QueueManager" ,"Cannot created QueueManager singleton.");
00066 throw QueueManagerError(QueueManagerError::ERROR_NO_QUEUE_MANAGER);
00067 } else {
00068 Log(DEBUG_LOG,LOG_NORMAL, "QueueManager", "Just created QueueManager singleton.");
00069 }
00070 }
00071 return inst;
00072 }
00073
00080 void QueueManager::clear() {
00081
00082 if (inst) {
00083 QueueManager *tmp = inst;
00084 inst = 0;
00085 DLog("QueueManager", "Destroying QueueManager singleton ...");
00086 delete tmp;
00087 }
00088
00089 DLog("QueueManager", "The QueueManager singleton has been destroyed");
00090 }
00091
00092
00106 void QueueManager::register_queue(FastQueue* fq, message::qaddr_t s) {
00107 pthread_mutex_lock(&mutex);
00108
00109 if (((uint32)s)>=queue_arr.capacity()) {
00110 Log(DEBUG_LOG,LOG_NORMAL, "QueueManager", "expanding queue array from " << s << " to " << s+5);
00111
00112 queue_arr.reserve(s+5);
00113 while (queue_arr.size()<queue_arr.capacity()) queue_arr.push_back(NULL);
00114 }
00115
00116 if (queue_arr[s])
00117 {
00118
00119 Log(ERROR_LOG,LOG_CRIT, "QueueManager", "A queue for " << s << " is already registered");
00120 throw QueueManagerError(QueueManagerError::ERROR_REGISTER);
00121 }
00122 else
00123 {
00124
00125 if (fq)
00126 {
00127 queue_arr[s] = fq;
00128 }
00129 else
00130 {
00131 Log(ERROR_LOG,LOG_CRIT, "QueueManager", "Cannot register queue for " << s);
00132 throw QueueManagerError(QueueManagerError::ERROR_REGISTER);
00133 }
00134 }
00135 pthread_mutex_unlock(&mutex);
00136 }
00137
00138
00139 void
00140 QueueManager::unregister_queue(message::qaddr_t s)
00141 {
00142 pthread_mutex_lock(&mutex);
00143 try {
00144 queue_arr.at(s) = 0;
00145 }
00146 catch ( std::out_of_range ) {
00147
00148
00149
00150
00151 }
00152 pthread_mutex_unlock(&mutex);
00153 }
00154
00155 FastQueue* QueueManager::get_queue(message::qaddr_t s) const {
00156 FastQueue* fq = NULL;
00157 pthread_mutex_lock(&mutex);
00158 if (((uint32)s)<queue_arr.size()) {
00159 fq = queue_arr[s];
00160 } else {
00161 fq = NULL;
00162 }
00163 pthread_mutex_unlock(&mutex);
00164 return fq;
00165 }
00166
00167 QueueManager* QueueManager::inst = NULL;
00168
00172 QueueManager::QueueManager() : queue_arr(QueueManager::INITIAL_ARRAY_SIZE) {
00173 pthread_mutexattr_t mutex_attr;
00174
00175 pthread_mutexattr_init(&mutex_attr);
00176
00177 #ifdef _DEBUG
00178 pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK);
00179 #else
00180 pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_NORMAL);
00181 #endif
00182
00183 pthread_mutex_init(&mutex, &mutex_attr);
00184
00185 pthread_mutexattr_destroy(&mutex_attr);
00186 }
00187
00188
00194 QueueManager::~QueueManager() {
00195
00196 pthread_mutex_lock(&mutex);
00197
00198
00199 for ( qm_array_it_t i = queue_arr.begin(); i != queue_arr.end(); i++)
00200 if ( *i != 0 )
00201 WLog("QueueManager",
00202 "~QueueManager(): queue " << (*i)->get_name()
00203 << " has not been unregistered");
00204
00205 pthread_mutex_unlock(&mutex);
00206
00207 pthread_mutex_destroy(&mutex);
00208 }
00209
00211
00212 }