00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00035 #include <sstream>
00036
00037 #include "timer_module.h"
00038 #include "queuemanager.h"
00039 #include "logfile.h"
00040 #include "cleanuphandler.h"
00041
00042 namespace protlib {
00043
00048 using namespace log;
00049
00050
00051
00052 TimerMsg::TimerMsg(qaddr_t s, bool s_err)
00053 : message(type_timer,s),
00054 time_sec(0),
00055 time_msec(0),
00056 action(ac_ignore),
00057 param1(NULL),
00058 param2(NULL),
00059 ok(true),
00060 send_error(s_err),
00061 relative(true)
00062 {
00063
00064 }
00065
00067 TimerMsg::~TimerMsg() {}
00068
00069
00073 bool TimerMsg::set_ok(bool r) {
00074 register bool old = ok;
00075 ok = r;
00076 return old;
00077 }
00078
00080 bool TimerMsg::start_absolute(int32 sec, int32 msec, param_t p1, param_t p2) {
00081 return start(false,sec,msec,p1,p2);
00082 }
00083
00085 bool TimerMsg::start_relative(int32 sec, int32 msec, param_t p1, param_t p2) {
00086 return start(true,sec,msec,p1,p2);
00087 }
00088
00089 bool TimerMsg::start(bool rel, int32 sec, int32 msec, param_t p1, param_t p2) {
00090 time_sec = (sec<=0)?0:sec;
00091 time_msec = (msec<=0)?0:msec;
00092 param1 = p1;
00093 param2 = p2;
00094 action = ac_start;
00095 relative = rel;
00096 ok = true;
00097 return true;
00098 }
00099
00101 bool TimerMsg::restart_absolute(id_t id, int32 sec, int32 msec) {
00102 return restart(false,id,sec,msec);
00103 }
00104
00106 bool TimerMsg::restart_relative(id_t id, int32 sec, int32 msec) {
00107 return restart(true,id,sec,msec);
00108 }
00109
00113 bool TimerMsg::restart(bool rel, id_t id, int32 sec, int32 msec) {
00114 relative = rel;
00115 if (id && set_id(id)) {
00116 time_sec = (sec<=0)?0:sec;
00117 time_msec = (msec<=0)?0:msec;
00118 action = ac_restart;
00119 ok = true;
00120 } else {
00121 time_sec = 0;
00122 time_msec = 0;
00123 action = ac_ignore;
00124 ok = false;
00125 }
00126 return ok;
00127 }
00128
00130 bool TimerMsg::stop(id_t id) {
00131 relative = false;
00132 time_sec = 0;
00133 time_msec = 0;
00134 param1 = param2 = NULL;
00135 if (id && set_id(id)) {
00136 action = ac_stop;
00137 ok = true;
00138 } else {
00139 action = ac_ignore;
00140 ok = false;
00141 }
00142 return ok;
00143 }
00144
00146 bool TimerMsg::stop_all() {
00147 relative = false,
00148 time_sec = 0;
00149 time_msec = 0;
00150 param1 = param2 = NULL;
00151 action = ac_stop_all;
00152 ok = true;
00153 return true;
00154 }
00155
00159 bool TimerMsg::set_elapsed() {
00160 send_error = false;
00161 action = ac_elapsed;
00162 ok = true;
00163 return ok;
00164 }
00165
00166 bool TimerMsg::get_send_error() const {
00167 return send_error;
00168 }
00169
00171 bool TimerMsg::set_send_error(bool f) {
00172 register bool o = send_error;
00173 send_error = f;
00174 return o;
00175 }
00176
00177 TimerMsg::param_t TimerMsg::get_param1() const { return param1; }
00178
00179 TimerMsg::param_t TimerMsg::get_param2() const { return param2; }
00180
00181 void TimerMsg::get_params(param_t& p1, param_t& p2) const {
00182 p1 = param1;
00183 p2 = param2;
00184 }
00185
00186 bool TimerMsg::is_absolute() const { return (!relative); }
00187
00188 bool TimerMsg::is_relative() const { return relative; }
00189
00190
00191
00197 TimerModuleParam::TimerModuleParam(uint32 sleep_time, bool sua, bool see, bool sre)
00198 : ThreadParam(sleep_time, "TimerModule", 2), send_until_abort(sua),
00199 source(message::qaddr_timer),
00200 send_error_expedited(see), send_reply_expedited(sre) {
00201
00202 }
00203
00204
00205
00207 TimerModule::TimerModule(const TimerModuleParam& p)
00208 : Thread(p), timerparam(p) {
00209 tmap.clear();
00210
00211 QueueManager::instance()->register_queue(get_fqueue(),p.source);
00212 DLog(timerparam.name, "Creating TimerModule object");
00213 }
00214
00216 TimerModule::~TimerModule() {
00217 stop_all_timers();
00218 DLog(timerparam.name, "Destroying TimerModule object");
00219 QueueManager::instance()->unregister_queue(timerparam.source);
00220
00221 }
00222
00226 void TimerModule::main_loop(uint32 nr) {
00227 Log(INFO_LOG,LOG_NORMAL, timerparam.name, "Starting " << timerparam.name << " thread #" << nr << ", " << ((nr%2) ? "processing input queue" : "processing timer callbacks"));
00228
00229 if (nr%2) process_queue();
00230 else process_elapsed_timers();
00231
00232 Log(INFO_LOG,LOG_NORMAL, timerparam.name,"Thread #" << nr << " stopped");
00233 }
00234
00241 void TimerModule::process_queue() {
00242 uint32 wait = timerparam.sleep_time*1000;
00243 message* msg = NULL;
00244 TimerMsg* tmsg = NULL;
00245 FastQueue* fq = QueueManager::instance()->get_queue(message::qaddr_timer);
00246 bool opresult = false;
00247 if (!fq) {
00248 Log(ERROR_LOG,LOG_ALERT, timerparam.name," cannot find input queue");
00249 return;
00250 }
00251
00252 while (get_state()==STATE_RUN) {
00253 msg = fq->dequeue_timedwait(wait);
00254 if (msg) {
00255 if (msg->get_type()==message::type_timer) {
00256 tmsg = dynamic_cast<TimerMsg*>(msg);
00257 if (tmsg) {
00258
00259 lock();
00260 if (tmsg->is_ok()) {
00261
00262 switch (tmsg->get_action()) {
00263 case TimerMsg::ac_ignore:
00264
00265 Log(DEBUG_LOG,LOG_UNIMP, timerparam.name,"received message with action set to ignore");
00266 opresult = true;
00267 break;
00268 case TimerMsg::ac_start:
00269 opresult = start_timer(tmsg);
00270 break;
00271 case TimerMsg::ac_restart:
00272 opresult = restart_timer(tmsg);
00273 break;
00274 case TimerMsg::ac_stop:
00275 opresult = stop_timer(tmsg);
00276 break;
00277 case TimerMsg::ac_stop_all:
00278 opresult = stop_all_timers();
00279 break;
00280 default:
00281 ERRLog(timerparam.name, "Wrong action " << tmsg->get_action() << " in message from " << tmsg->get_qaddr_name() << " to " << message::get_qaddr_name(timerparam.source) );
00282
00283 opresult = false;
00284 }
00285 } else {
00286 Log(DEBUG_LOG,LOG_UNIMP, timerparam.name,"received message in invalid state, mid " << tmsg->get_id() );
00287 opresult = false;
00288 }
00289
00290 send_error_or_dispose(tmsg,opresult);
00291
00292 unlock();
00293 } else {
00294 Log(ERROR_LOG,LOG_ALERT, timerparam.name, "Cannot cast message from " << msg->get_qaddr_name() << " of type " << msg->get_type_name() << " to TimerMsg");
00295 delete msg;
00296 }
00297 } else {
00298 ERRLog(timerparam.name,"received message that is not of type_timer from " << msg->get_qaddr_name() << ", type was " << msg->get_type_name());
00299 delete msg;
00300 }
00301 }
00302 }
00303 }
00304
00310 void TimerModule::process_elapsed_timers() {
00311 state_t end_state;
00312 uint32 num = 0;
00313 uint32 sleeptime = timerparam.sleep_time*1000;
00314 if (timerparam.send_until_abort) end_state = STATE_ABORT;
00315 else end_state = STATE_STOP;
00316 while(get_state()!=end_state) {
00317 num = tm.check_timers_wait(sleeptime);
00318 if (num) {
00319 Log(DEBUG_LOG,LOG_UNIMP, timerparam.name,"got " << num << " elapsed timer(s)");
00320 }
00321 }
00322 }
00323
00325 bool TimerModule::start_timer(TimerMsg* m) {
00326 timer_id_t tid = 0;
00327 bool res = true;
00328 message::id_t mid = m->get_id();
00329 bool relative = m->is_relative();
00330 int32 sec = 0;
00331 int32 msec = 0;
00332 if (mid) {
00333
00334 if ((tid=tmap.lookup_tid(mid))) {
00335 ERRLog(timerparam.name, m->get_qaddr_name() << " tried to start a timer with mid " << mid << ", but there is already a timer " << tid);
00336 res = false;
00337 } else {
00338
00339 m->get_time(sec,msec);
00340 if (relative) tid = tm.start_relative(this,sec,msec,NULL);
00341 else tid = tm.start_absolute(this,sec,msec,NULL);
00342 if (tid) {
00343
00344 tmap.insert(tid,m);
00345
00346 Log(EVENT_LOG,LOG_UNIMP, timerparam.name, "Timer " << tid << " (" << sec << "s " << msec << "ms) started for "
00347 << m->get_qaddr_name() << " with mid " << mid);
00348 res = true;
00349 } else {
00350
00351 ERRLog(timerparam.name, "TimerManager in " << timerparam.name << " is unable to start a timer for " << m->get_qaddr_name());
00352 res = false;
00353 }
00354 }
00355 } else {
00356 ERRLog(timerparam.name, m->get_qaddr_name() << " tried to start a timer with message ID 0");
00357 res = false;
00358 }
00359 return res;
00360 }
00361
00363 bool TimerModule::restart_timer(TimerMsg* m) {
00364 timer_id_t tid = 0;
00365 bool res = true;
00366 message::id_t mid = m->get_id();
00367 bool relative = m->is_relative();
00368 TimerMsg* repmsg = NULL;
00369 int32 sec = 0;
00370 int32 msec = 0;
00371 if (mid) {
00372
00373 tid = tmap.lookup_tid(mid);
00374 repmsg = tmap.lookup_msg(tid);
00375 if (tid && repmsg) {
00376
00377 m->get_time(sec,msec);
00378 if (relative) res = tm.restart_relative(tid,sec,msec);
00379 else res = tm.restart_absolute(tid,sec,msec);
00380 if (res) {
00381
00382 repmsg->restart(relative,mid,sec,msec);
00383
00384 DLog(timerparam.name, "Timer " << tid << ", mid " << mid << " restarted for " << m->get_qaddr_name());
00385 } else {
00386
00387 ERRLog(timerparam.name, "TimerManager in " << timerparam.name << " is unable to restart a timer for " << m->get_qaddr_name() << ", mid " << mid);
00388 }
00389 } else {
00390 if (tid) ERRLog(timerparam.name, m->get_qaddr_name() << " tried to restart a timer with mid " << mid << ": or no reply message for timer found");
00391 if (repmsg) ERRLog(timerparam.name, m->get_qaddr_name() << " tried to restart timer with mid " << mid << ": timer not found");
00392 if ((!repmsg) & (!tid)) ERRLog(timerparam.name, m->get_qaddr_name() << " tried to restart timer with mid " << mid << ": neither timer nor reply message found");
00393 res = false;
00394 }
00395 } else {
00396 ERRLog(timerparam.name, m->get_qaddr_name() << " tried to restart a timer with an invalid message ID");
00397 res = false;
00398 }
00399 return res;
00400 }
00401
00403 bool TimerModule::stop_timer(TimerMsg* m) {
00404 timer_id_t tid = 0;
00405 bool res = true;
00406 message::id_t mid = m->get_id();
00407 if (mid) {
00408
00409 tid = tmap.lookup_tid(mid);
00410 if (tid) {
00411
00412 res = tm.stop(tid);
00413 if (res) {
00414
00415 tmap.erase(tid,mid,true);
00416
00417 DLog(timerparam.name, "Stopped timer " << tid << ", mid " << mid << " for " << m->get_qaddr_name());
00418 } else {
00419
00420 ERRLog(timerparam.name, "TimerManager in " << timerparam.name << " is unable to stop timer " << tid << ", mid " << mid << " for " << m->get_qaddr_name());
00421 }
00422 } else {
00423 ERRLog(timerparam.name, m->get_qaddr_name() << " tried to stop a non-existing timer with mid " << mid);
00424 res = false;
00425 }
00426 } else {
00427 ERRLog(timerparam.name, m->get_qaddr_name() << " tried to stop a timer with an invalid message ID");
00428 res = false;
00429 }
00430 return res;
00431 }
00432
00434 bool TimerModule::stop_all_timers() {
00435 uint32 num = 0;
00436
00437 tmap.clear(true);
00438
00439 num = tm.stop_all();
00440 Log(DEBUG_LOG,LOG_UNIMP, timerparam.name,"stopped all timers, num " << num);
00441 return true;
00442 }
00443
00450 void TimerModule::send_error_or_dispose(TimerMsg* m, bool ok) {
00451 message::qaddr_t dest;
00452 if (!m) return;
00453
00454 #ifndef _NO_LOGGING
00455 message::id_t mid = m->get_id();
00456 #endif
00457
00458
00459 if ((!ok) && m->get_send_error()) ok = false;
00460 else ok = true;
00461 if (ok) {
00462
00463 if (m->get_action()!=TimerMsg::ac_start) {
00464 dest = m->get_source();
00465 delete m;
00466
00467 }
00468 } else {
00469
00470 dest = m->get_source();
00471 m->set_ok(false);
00472 if (m->send(timerparam.source,dest,timerparam.send_error_expedited)) {
00473 Log(DEBUG_LOG,LOG_UNIMP, timerparam.name,"sent error message w/ mid " << mid << " to " << message::get_qaddr_name(dest));
00474 } else {
00475 ERRLog(timerparam.name,"cannot send error message w/ mid " << mid << " to " << message::get_qaddr_name(dest) << ", disposing it now");
00476 delete m;
00477 }
00478 }
00479 }
00480
00488 void TimerModule::timer_expired(timer_id_t timer, timer_callback_param_t callback_param) {
00489 TimerMsg* msg = NULL;
00490 message::qaddr_t dest;
00491 message::id_t mid = 0;
00492
00493 lock();
00494
00495 msg = tmap.lookup_msg(timer);
00496 if (msg) {
00497
00498 mid = msg->get_id();
00499
00500 dest = msg->get_source();
00501 if (msg->set_elapsed() && msg->send_back(timerparam.source,timerparam.send_reply_expedited)) {
00502 Log(DEBUG_LOG,LOG_UNIMP, timerparam.name,"sent reply mid " << mid << " for elapsed timer " << timer << " to " << message::get_qaddr_name(dest));
00503 } else {
00504 ERRLog(timerparam.name,"cannot send reply mid " << mid << " for elapsed timer " << timer <<" to " << message::get_qaddr_name(dest));
00505
00506 delete msg;
00507 }
00508 } else {
00509
00510 DLog(timerparam.name, "TimerModule::timer_expired cannot find reply message for a timer " << timer << ". Maybe the timer has been stopped");
00511 }
00512
00513 tmap.erase(timer,mid,false);
00514
00515 unlock();
00516 }
00517
00518
00519
00520 bool TimerModule::TimerMap::insert(timer_id_t tid, TimerMsg* m) {
00521 if (tid && m) {
00522 message::id_t mid = m->get_id();
00523 tid2mid[tid] = mid;
00524 mid2tid[mid] = tid;
00525 tid2msg[tid] = m;
00526 return true;
00527 } else return false;
00528 }
00529
00533 message::id_t TimerModule::TimerMap::lookup_mid(timer_id_t tid) const {
00534 const_tid2mid_it_t hit;
00535 hit = tid2mid.find(tid);
00536 if (hit!=tid2mid.end()) return hit->second;
00537 else return 0;
00538 }
00539
00543 timer_id_t TimerModule::TimerMap::lookup_tid(message::id_t mid) const {
00544 const_mid2tid_it_t hit;
00545 if ((hit=mid2tid.find(mid))!=mid2tid.end()) return hit->second;
00546 else return 0;
00547 }
00548
00552 TimerMsg* TimerModule::TimerMap::lookup_msg(timer_id_t tid) const {
00553 const_tid2msg_it_t hit;
00554 hit = tid2msg.find(tid);
00555 if (hit!=tid2msg.end()) return hit->second;
00556 else return NULL;
00557 }
00558
00562 void TimerModule::TimerMap::erase(timer_id_t tid, message::id_t mid, bool dispose) {
00563 TimerMsg* m = NULL;
00564 if (tid) {
00565 if (dispose) {
00566 m = lookup_msg(tid);
00567 if (m) delete m;
00568 }
00569 tid2mid.erase(tid);
00570 tid2msg.erase(tid);
00571 }
00572 if (mid) mid2tid.erase(mid);
00573 }
00574
00575 void TimerModule::TimerMap::clear(bool dispose) {
00576 const_tid2msg_it_t hit;
00577 if (dispose) {
00578 for (hit=tid2msg.begin();hit!=tid2msg.end();hit++) {
00579 if (hit->second) delete hit->second;
00580 }
00581 }
00582 tid2mid.clear();
00583 tid2msg.clear();
00584 mid2tid.clear();
00585 }
00586
00588
00589 }