source: source/ariba/utility/system/SystemQueue.cpp@ 12060

Last change on this file since 12060 was 12060, checked in by hock@…, 11 years ago

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

File size: 10.5 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37
38#include "SystemQueue.h"
39
40namespace ariba {
41namespace utility {
42
43use_logging_cpp(SystemQueue);
44
45SystemQueue::SystemQueue()
46#ifndef UNDERLAY_OMNET
47 : delayScheduler( &directScheduler ), systemQueueRunning( false )
48#endif
49{
50}
51
52SystemQueue::~SystemQueue() {
53}
54
55void SystemQueue::scheduleEvent( const SystemEvent& event, uint32_t delay ) {
56#ifndef UNDERLAY_OMNET
57 if( delay == 0 ) directScheduler.insert( event, delay );
58 else delayScheduler.insert ( event, delay );
59#else
60 Enter_Method_Silent();
61 cMessage* msg = new cMessage();
62 msg->setContextPointer( new SystemEvent(event) );
63
64 if( delay == 0 )
65 cSimpleModule::scheduleAt( cSimpleModule::simTime(), msg );
66 else
67 cSimpleModule::scheduleAt( cSimpleModule::simTime()+((double)delay/1000.0), msg );
68#endif
69}
70
71// maps to function call internally to the Event-system
72void SystemQueue::scheduleCall( const boost::function0<void>& function, uint32_t delay)
73{
74 // copy function object
75 boost::function0<void>* function_ptr = new boost::function0<void>();
76 (*function_ptr) = function;
77
78 // schedule special call-event
79 scheduleEvent( SystemEvent(&internal_function_caller, SystemEventType::DEFAULT, function_ptr), delay );
80
81}
82
83#ifdef UNDERLAY_OMNET
84void SystemQueue::handleMessage(cMessage* msg){
85 SystemEvent* event = (SystemEvent*)msg->contextPointer();
86
87 event->listener->handleSystemEvent( *event );
88
89 delete event; delete msg;
90}
91#endif
92
93void SystemQueue::run() {
94#ifndef UNDERLAY_OMNET
95 systemQueueRunning = true;
96 directScheduler.run();
97 delayScheduler.run();
98#endif
99}
100
101void SystemQueue::cancel() {
102#ifndef UNDERLAY_OMNET
103 systemQueueRunning = false;
104 directScheduler.cancel();
105 delayScheduler.cancel();
106#endif
107}
108
109void SystemQueue::dropAll( const SystemEventListener* mlistener){
110#ifndef UNDERLAY_OMNET
111 directScheduler.dropAll(mlistener);
112 delayScheduler.dropAll(mlistener);
113#endif
114}
115
116bool SystemQueue::isEmpty() {
117#ifndef UNDERLAY_OMNET
118 return directScheduler.isEmpty() || delayScheduler.isEmpty();
119#else
120 return false;
121#endif
122}
123
124bool SystemQueue::isRunning() {
125#ifndef UNDERLAY_OMNET
126 return systemQueueRunning;
127#else
128 return true;
129#endif
130}
131
132void SystemQueue::enterMethod(){
133 // TODO: omnet case and delay scheduler
134 directScheduler.enter();
135}
136
137void SystemQueue::leaveMethod(){
138 // TODO: omnet case and delay scheduler
139 directScheduler.leave();
140}
141
142//***************************************************************
143#ifndef UNDERLAY_OMNET
144
145SystemQueue::QueueThread::QueueThread(QueueThread* _transferQueue)
146 : transferQueue( _transferQueue ), running( false ) {
147}
148
149SystemQueue::QueueThread::~QueueThread(){
150}
151
152void SystemQueue::QueueThread::run(){
153 running = true;
154
155 queueThread = new boost::thread(
156 boost::bind(&QueueThread::threadFunc, this) );
157}
158
159void SystemQueue::QueueThread::cancel(){
160
161 logging_debug("cancelling system queue");
162
163 // cause the thread to exit
164 {
165 // get the lock, when we got the lock the
166 // queue thread must be in itemsAvailable.wait()
167 boost::mutex::scoped_lock lock(queueMutex);
168
169 // set the running indicator and signal to run on
170 // this will run the thread and quit it
171 running = false;
172 itemsAvailable.notify_all();
173 }
174
175 // wait until the thread has exited
176 logging_debug("joining system queue thread");
177 queueThread->join();
178
179 // delete pending events
180 logging_debug("deleting pending system queue events");
181 while( eventsQueue.size() > 0 ){
182 eventsQueue.erase( eventsQueue.begin() );
183 }
184
185 // delete the thread, so that a subsuquent run() can be called
186 delete queueThread;
187 queueThread = NULL;
188}
189
190bool SystemQueue::QueueThread::isEmpty(){
191 boost::mutex::scoped_lock lock( queueMutex );
192 return eventsQueue.empty();
193}
194
195void SystemQueue::QueueThread::insert( const SystemEvent& event, uint32_t delay ){
196
197 // if this is called from a module that is currently handling
198 // a thread (called from SystemQueue::onNextQueueItem), the
199 // thread is the same anyway and the mutex will be already
200 // aquired, otherwise we aquire it now
201
202 boost::mutex::scoped_lock lock( queueMutex );
203
204 eventsQueue.push_back( event );
205 eventsQueue.back().scheduledTime = boost::posix_time::microsec_clock::local_time();
206 eventsQueue.back().delayTime = delay;
207 eventsQueue.back().remainingDelay = delay;
208
209 onItemInserted( event );
210 itemsAvailable.notify_all();
211}
212
213void SystemQueue::QueueThread::dropAll( const SystemEventListener* mlistener) {
214 boost::mutex::scoped_lock lock( queueMutex );
215
216 bool deleted;
217 do{
218 deleted = false;
219 EventQueue::iterator i = eventsQueue.begin();
220 EventQueue::iterator iend = eventsQueue.end();
221
222 for( ; i != iend; i++){
223 if((*i).getListener() == mlistener){
224 eventsQueue.erase(i);
225 deleted = true;
226 break;
227 }
228 }
229 }while(deleted);
230}
231
232void SystemQueue::QueueThread::threadFunc( QueueThread* obj ) {
233
234 boost::mutex::scoped_lock lock( obj->queueMutex );
235
236 while( obj->running ) {
237
238 // wait until an item is in the queue or we are notified
239 // to quit the thread. in case the thread is about to
240 // quit, the queueThreadRunning variable will indicate
241 // this and cause the thread to exit
242
243 while ( obj->running && obj->eventsQueue.empty() ){
244
245// const boost::system_time duration =
246// boost::get_system_time() +
247// boost::posix_time::milliseconds(100);
248// obj->itemsAvailable.timed_wait( lock, duration );
249
250 obj->itemsAvailable.wait( lock );
251 }
252
253 //
254 // work all the items that are currently in the queue
255 //
256
257 while( obj->running && (!obj->eventsQueue.empty()) ) {
258
259 // fetch the first item in the queue
260 // and deliver it to the queue handler
261 SystemEvent ev = obj->eventsQueue.front();
262 obj->eventsQueue.erase( obj->eventsQueue.begin() );
263
264 // call the queue and this will
265 // call the actual event handler
266 obj->queueMutex.unlock();
267 obj->onNextQueueItem( ev );
268 obj->queueMutex.lock();
269
270 } // !obj->eventsQueue.empty() )
271 } // while (obj->running)
272
273 logging_debug("system queue exited");
274}
275
276void SystemQueue::QueueThread::enter(){
277 queueMutex.lock();
278}
279
280void SystemQueue::QueueThread::leave(){
281 queueMutex.unlock();
282}
283
284
285//***************************************************************
286
287SystemQueue::QueueThreadDirect::QueueThreadDirect(){
288}
289
290SystemQueue::QueueThreadDirect::~QueueThreadDirect(){
291}
292
293void SystemQueue::QueueThreadDirect::onItemInserted( const SystemEvent& event ){
294 // do nothing here
295}
296
297void SystemQueue::QueueThreadDirect::onNextQueueItem( const SystemEvent& event ){
298 // directly deliver the item to the
299 event.getListener()->handleSystemEvent( event );
300}
301
302//***************************************************************
303
304SystemQueue::QueueThreadDelay::QueueThreadDelay(QueueThread* _transferQueue)
305 : QueueThread( _transferQueue ), isSleeping( false ) {
306
307 assert( _transferQueue != NULL );
308}
309
310SystemQueue::QueueThreadDelay::~QueueThreadDelay(){
311}
312
313void SystemQueue::QueueThreadDelay::onItemInserted( const SystemEvent& event ){
314
315 if( !isSleeping) return;
316
317 // break an existing sleep and
318 // remember the time that was actually slept for
319 // and change it for every event in the queue
320
321 assert( !eventsQueue.empty());
322 sleepCond.notify_all();
323
324 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
325 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
326 uint32_t sleptTime = duration.total_milliseconds();
327
328 EventQueue::iterator i = eventsQueue.begin();
329 EventQueue::iterator iend = eventsQueue.end();
330
331 for( ; i != iend; i++ ) {
332
333 if( sleptTime >= i->remainingDelay)
334 i->remainingDelay = 0;
335 else
336 i->remainingDelay -= sleptTime;
337
338 } // for( ; i != iend; i++ )
339
340 // now we have to reorder the events
341 // in the queue with respect to their remaining delay
342 // the SystemQueue::operator< takes care of the
343 // ordering with respect to the remaining delay
344
345 std::sort( eventsQueue.begin(), eventsQueue.end() );
346
347}
348
349void SystemQueue::QueueThreadDelay::onNextQueueItem( const SystemEvent& event ){
350
351 // sleeps will be cancelled in the
352 // onItemInserted function when a new
353 // event arrives during sleeping
354
355 assert( !isSleeping );
356
357 // the given item is the one with the least
358 // amount of sleep time left. because all
359 // items are reordered in onItemInserted
360
361 if( event.remainingDelay > 0 ) {
362
363 const boost::system_time duration =
364 boost::get_system_time() +
365 boost::posix_time::milliseconds(event.remainingDelay);
366
367 {
368 boost::unique_lock<boost::mutex> lock( sleepMutex );
369
370 sleepStart = boost::posix_time::microsec_clock::local_time();
371 isSleeping = true;
372
373 sleepCond.timed_wait( lock, duration );
374
375 isSleeping = false;
376 }
377
378 } // if( event.remainingDelay > 0 )
379
380 // if the sleep succeeded and was not
381 // interrupted by a new incoming item
382 // we can now deliver this event
383
384 ptime sleepEnd = boost::posix_time::microsec_clock::local_time();
385 boost::posix_time::time_duration duration = sleepEnd - sleepStart;
386 uint32_t sleptTime = duration.total_milliseconds();
387
388 if (event.remainingDelay <= sleptTime)
389 transferQueue->insert( event, 0 );
390}
391
392#endif // #ifndef UNDERLAY_OMNET
393
394//***************************************************************
395
396}} // spovnet, common
Note: See TracBrowser for help on using the repository browser.