source: source/ariba/communication/modules/transport/tcp/TCPTransport.cpp@ 5526

Last change on this file since 5526 was 5464, checked in by Christoph Mayer, 15 years ago

-periodic broadcast fix

File size: 9.1 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "TCPTransport.h"
40
41#define _NO_LOGGING
42
43// std includes
44#include <unistd.h>
45#include <iostream>
46#include <string>
47#include <sstream>
48
49// protlib includes
50#include "../protlib/network_message.h"
51#include "../protlib/tp_over_tcp.h"
52#include "../protlib/logfile.h"
53#include "../protlib/queuemanager.h"
54#include "../protlib/threadsafe_db.h"
55#include "../protlib/setuid.h"
56
57// spovnet includes
58#include "ariba/utility/serialization.h"
59#include "ariba/utility/system/SystemQueue.h"
60#include "ariba/utility/system/SystemEvent.h"
61#include "ariba/utility/system/SystemEventType.h"
62#include "ariba/communication/modules/network/ip/IPv4Locator.h"
63
64// protlib namespaces
65using namespace protlib;
66using namespace protlib::log;
67
68// spovnet namespaces
69using ariba::utility::SystemQueue;
70using ariba::utility::SystemEvent;
71using ariba::utility::SystemEventType;
72using ariba::utility::MessageProvider;
73using ariba::utility::TextMessage;
74using ariba::utility::MessageReceiver;
75using ariba::communication::IPv4Locator;
76
77using_serialization;
78
79logfile commonlog;
80protlib::log::logfile& protlib::log::DefaultLog(commonlog);
81
82#include "ariba/communication/modules/_namespace.h"
83NAMESPACE_BEGIN;
84
85SystemEventType TCPTransportEvent("TCPTransport");
86SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent );
87SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent );
88
89use_logging_cpp(TCPTransport);
90
91TCPTransport::TCPTransport( port_t port ) {
92 this->running = false;
93 this->done = false;
94 this->port = port;
95
96 logging_debug( "creating tcp transport module" );
97}
98
99TCPTransport::~TCPTransport() {
100 logging_debug( "deleting tcp transport module" );
101}
102
103void TCPTransport::start() {
104
105 logging_info( "starting tcp transport module ..." );
106
107 // initalize netdb and setuid
108 protlib::tsdb::init();
109 protlib::setuid::init();
110
111 /* set tcp parameters */
112 port_t port = this->port; // port
113 TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port);
114
115 /* create receiver thread */
116 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
117 QueueManager::instance()->register_queue(tpchecker_fq,
118 message::qaddr_signaling);
119
120 /* start thread */
121 pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this );
122 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
123 tpthread->start_processing();
124
125 logging_info( "tcp transport module started" );
126}
127
128void TCPTransport::stop() {
129
130 logging_info( "stopping tcp transport module ..." );
131
132 // stop receiver thread
133 done = true;
134
135 // stop TPoverTCP
136 tpthread->stop_processing();
137 tpthread->abort_processing(true);
138 tpthread->wait_until_stopped();
139
140 // unregister TPoverTCP
141 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
142
143 // destroy QueueManager
144 QueueManager::clear();
145
146 // de-initalize netdb and setuid
147 protlib::setuid::end();
148 protlib::tsdb::end();
149
150 logging_info( "tcp transport module stopped" );
151}
152
153bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) {
154 clen_bytes = m.decode32();
155 m.set_pos_r(-4);
156 return true;
157}
158
159void* TCPTransport::receiverThread( void* ptp ) {
160
161 logging_info( "running tcp transport receiver thread" );
162
163 // get reference to transport object
164 TCPTransport& tp = *((TCPTransport*)ptp);
165
166 // get queue
167 FastQueue* fq =
168 QueueManager::instance()->get_queue(message::qaddr_signaling);
169
170 // main processing loop
171 tp.running = true;
172 while (!tp.done) {
173
174 // wait for new message to approach
175 message* msg = fq->dequeue_timedwait(300);
176
177 // handle message
178 if (msg) {
179
180 logging_debug( "Received incoming message" );
181
182 // handle transport message
183 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
184 if (tpmsg) {
185 // evaluate TP message
186 const appladdress* peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
187 NetMsg* datamsg = tpmsg->get_message();
188 TPError* err = tpmsg->get_error();
189
190 // get data
191 if (datamsg) {
192
193 datamsg->set_pos(0);
194 uint32_t msgLength = datamsg->decode32(true);
195 uint16_t remotePort = datamsg->decode16(true);
196
197 // convert data
198 Data data(
199 (uint8_t*)(datamsg->get_buffer()+6),
200 (datamsg->get_size()-6)*8
201 );
202
203 // converting message
204 logging_debug( "converting message" );
205 Message* msg = new Message(data);
206
207 std::ostringstream o;
208 o << (peer->get_ip_str() + 7) << ":" << remotePort;
209 msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
210 logging_debug( "> source address = " << o.str() );
211 logging_debug( "> message = " << msg->toString() );
212
213 // dispatching message
214 logging_debug( "Dispatching message" );
215 SystemQueue::instance().scheduleEvent(
216 SystemEvent( &tp, TCPMessageDispatchEvent, msg )
217 );
218 }
219
220 // check error
221 if (err)
222 logging_error( "TCP transport error " + string(err->getstr()) );
223
224 logging_debug( "Message processed." );
225
226 tpmsg = NULL;
227 }
228 delete msg;
229 }
230 }
231
232 // clean queue & stop
233 fq->cleanup();
234 tp.running = false;
235 return NULL;
236}
237
238seqnum_t TCPTransport::sendMessage(const Message* message ) {
239
240 Data data = data_serialize( message );
241 //std::cout << "XXXXXXXXXXXXXsending out data using tcp transport: " << data << std::endl;
242
243
244 // prepare netmsg and send it
245 NetMsg* datamsg = new NetMsg(data.getLength()/8+6);
246 datamsg->encode32(data.getLength()/8+2, true);
247 datamsg->encode16(this->port,true);
248 for (int i=0; i<data.getLength()/8; i++)
249 datamsg->encode8(data.getBuffer()[i],true);
250
251 // send message
252 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
253 if( address == NULL) return 0;
254
255 logging_debug( "sending message of size " << data.getLength() <<
256 " to address " + address->toString() <<
257 ": " + message->toString() );
258 const_cast<Message*>(message)->dropPayload();
259
260 string s = address->toString();
261 string::size_type i = s.find(':');
262 string ip = address->toString().substr(0,i).c_str();
263 logging_debug( "ip= " << ip << " port=" << address->getPort() );
264
265 appladdress peer(ip.c_str(), "tcp", address->getPort() );
266 tpthread->get_thread_object()->send(datamsg, peer, false);
267
268 // release data
269 data.release();
270 logging_debug( "message sent!" );
271
272 return 0;
273}
274
275void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){
276
277 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(remote);
278 if( address == NULL) return;
279
280 string s = address->toString();
281 string::size_type i = s.find(':');
282 string ip = address->toString().substr(0,i).c_str();
283
284 appladdress peer( ip.c_str(), "tcp", address->getPort() );
285 peer.convert_to_ipv6();
286
287 tpthread->get_thread_object()->terminate( peer );
288}
289
290TransportLocator::prot_t TCPTransport::getId() {
291 return 6; // TCP
292}
293
294
295
296const vector<TransportLocator*> TCPTransport::getLocators() {
297 return vector<TransportLocator*>();
298}
299
300/* system event handler */
301void TCPTransport::handleSystemEvent( const SystemEvent& event ) {
302
303 // dispatch received messages
304 if ( event.getType() == TCPMessageDispatchEvent ){
305 logging_debug( "forwarding message to local receivers" );
306 Message* msg = event.getData<Message>();
307 MessageProvider::sendMessageToReceivers( msg );
308 msg->dropPayload();
309// delete msg->getSourceAddress();
310 delete msg;
311 }
312
313 if ( event.getType() == TCPTransportTestEvent ) {
314
315 // add listener
316 addMessageReceiver( new MessageReceiver() );
317
318 // send message
319 //cout << "Sending message ..." << endl;
320 sendMessage( new TextMessage( "Hello World!" ) );
321 //cout << "Message sent ..." << endl;
322
323 }
324}
325
326NAMESPACE_END;
Note: See TracBrowser for help on using the repository browser.