close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/communication/modules/transport/tcp/TCPTransport.cpp@ 4606

Last change on this file since 4606 was 4598, checked in by Christoph Mayer, 15 years ago

tcp transport log output

File size: 9.2 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "TCPTransport.h"
40
41#define _NO_LOGGING
42
43// std includes
44#include <unistd.h>
45#include <iostream>
46#include <string>
47#include <sstream>
48
49// protlib includes
50#include "../protlib/network_message.h"
51#include "../protlib/tp_over_tcp.h"
52#include "../protlib/logfile.h"
53#include "../protlib/queuemanager.h"
54#include "../protlib/threadsafe_db.h"
55#include "../protlib/setuid.h"
56
57// spovnet includes
58#include "ariba/utility/serialization.h"
59#include "ariba/utility/system/SystemQueue.h"
60#include "ariba/utility/system/SystemEvent.h"
61#include "ariba/utility/system/SystemEventType.h"
62#include "ariba/communication/modules/network/ip/IPv4Locator.h"
63
64// protlib namespaces
65using namespace protlib;
66using namespace protlib::log;
67
68// spovnet namespaces
69using ariba::utility::SystemQueue;
70using ariba::utility::SystemEvent;
71using ariba::utility::SystemEventType;
72using ariba::utility::MessageProvider;
73using ariba::utility::TextMessage;
74using ariba::utility::MessageReceiver;
75using ariba::communication::IPv4Locator;
76
77using_serialization;
78
79logfile commonlog;
80protlib::log::logfile& protlib::log::DefaultLog(commonlog);
81
82#include "ariba/communication/modules/_namespace.h"
83NAMESPACE_BEGIN;
84
85SystemEventType TCPTransportEvent("TCPTransport");
86SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent );
87SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent );
88
89use_logging_cpp(TCPTransport);
90
91TCPTransport::TCPTransport( port_t port ) {
92 this->running = false;
93 this->done = false;
94 this->port = port;
95
96 logging_debug( "creating tcp transport module" );
97}
98
99TCPTransport::~TCPTransport() {
100 logging_debug( "deleting tcp transport module" );
101}
102
103void TCPTransport::start() {
104
105 logging_info( "starting tcp transport module ..." );
106
107 // initalize netdb and setuid
108 protlib::tsdb::init();
109 protlib::setuid::init();
110
111 /* set tcp parameters */
112 port_t port = this->port; // port
113 TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port);
114
115 /* create receiver thread */
116 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
117 QueueManager::instance()->register_queue(tpchecker_fq,
118 message::qaddr_signaling);
119
120 /* start thread */
121 pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this );
122 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
123 tpthread->start_processing();
124
125 logging_info( "tcp transport module started" );
126}
127
128void TCPTransport::stop() {
129
130 logging_info( "stopping tcp transport module ..." );
131
132 // stop receiver thread
133 done = true;
134
135 // stop TPoverTCP
136 tpthread->stop_processing();
137 tpthread->abort_processing(true);
138 tpthread->wait_until_stopped();
139
140 // unregister TPoverTCP
141 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
142
143 // destroy QueueManager
144 QueueManager::clear();
145
146 // de-initalize netdb and setuid
147 protlib::setuid::end();
148 protlib::tsdb::end();
149
150 logging_info( "tcp transport module stopped" );
151}
152
153bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) {
154 clen_bytes = m.decode32();
155 m.set_pos_r(-4);
156 return true;
157}
158
159void* TCPTransport::receiverThread( void* ptp ) {
160
161 logging_info( "running tcp transport receiver thread" );
162
163 // get reference to transport object
164 TCPTransport& tp = *((TCPTransport*)ptp);
165
166 // get queue
167 FastQueue* fq =
168 QueueManager::instance()->get_queue(message::qaddr_signaling);
169
170 // main processing loop
171 tp.running = true;
172 while (!tp.done) {
173
174 // wait for new message to approach
175 message* msg = fq->dequeue_timedwait(300);
176
177 // handle message
178 if (msg) {
179
180 logging_debug( "Received incoming message" );
181
182 // handle transport message
183 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
184 if (tpmsg) {
185 // evaluate TP message
186 const appladdress* peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
187 NetMsg* datamsg = tpmsg->get_message();
188 TPError* err = tpmsg->get_error();
189
190 // get data
191 if (datamsg) {
192
193 datamsg->set_pos(0);
194 uint32_t msgLength = datamsg->decode32(true);
195 uint16_t remotePort = datamsg->decode16(true);
196
197 // convert data
198 Data data(
199 (uint8_t*)(datamsg->get_buffer()+6),
200 (datamsg->get_size()-6)*8
201 );
202
203 // converting message
204 logging_debug( "converting message" );
205
206
207 std::cout << "XXXXXXXXXXXXreceived data on tcp transport: " << data << std::endl;
208
209
210 Message* msg = new Message(data);
211 std::ostringstream o;
212 o << (peer->get_ip_str() + 7) << ":" << remotePort;
213 msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
214 logging_debug( "> source address = " << o.str() );
215 logging_debug( "> message = " << msg->toString() );
216
217 // dispatching message
218 logging_debug( "Dispatching message" );
219 SystemQueue::instance().scheduleEvent(
220 SystemEvent( &tp, TCPMessageDispatchEvent, msg )
221 );
222 }
223
224 // check error
225 if (err)
226 logging_error( "TCP transport error " + string(err->getstr()) );
227
228 logging_debug( "Message processed." );
229
230 tpmsg = NULL;
231 }
232 delete msg;
233 }
234 }
235
236 // clean queue & stop
237 fq->cleanup();
238 tp.running = false;
239 return NULL;
240}
241
242seqnum_t TCPTransport::sendMessage(const Message* message ) {
243
244 Data data = data_serialize( message );
245
246
247 std::cout << "XXXXXXXXXXXXXsending out data using tcp transport: " << data << std::endl;
248
249
250
251
252 const_cast<Message*>(message)->dropPayload();
253
254 // prepare netmsg and send it
255 NetMsg* datamsg = new NetMsg(data.getLength()/8+6);
256 datamsg->encode32(data.getLength()/8+2, true);
257 datamsg->encode16(this->port,true);
258 for (int i=0; i<data.getLength()/8; i++)
259 datamsg->encode8(data.getBuffer()[i],true);
260
261 // send message
262 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
263 if( address == NULL) return 0;
264
265 logging_debug( "sending message of size " << data.getLength() <<
266 " to address " + address->toString() <<
267 ": " + message->toString() );
268
269 string s = address->toString();
270 string::size_type i = s.find(':');
271 string ip = address->toString().substr(0,i).c_str();
272 logging_debug( "ip= " << ip << " port=" << address->getPort() );
273
274 appladdress peer(ip.c_str(), "tcp", address->getPort() );
275 tpthread->get_thread_object()->send(datamsg, peer, false);
276
277 // release data
278 data.release();
279 logging_debug( "message sent!" );
280
281 return 0;
282}
283
284void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){
285
286 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(remote);
287 if( address == NULL) return;
288
289 string s = address->toString();
290 string::size_type i = s.find(':');
291 string ip = address->toString().substr(0,i).c_str();
292
293 appladdress peer( ip.c_str(), "tcp", address->getPort() );
294 peer.convert_to_ipv6();
295
296 tpthread->get_thread_object()->terminate( peer );
297}
298
299TransportLocator::prot_t TCPTransport::getId() {
300 return 6; // TCP
301}
302
303
304
305const vector<TransportLocator*> TCPTransport::getLocators() {
306 return vector<TransportLocator*>();
307}
308
309/* system event handler */
310void TCPTransport::handleSystemEvent( const SystemEvent& event ) {
311
312 // dispatch received messages
313 if ( event.getType() == TCPMessageDispatchEvent ){
314 logging_debug( "forwarding message to local receivers" );
315 Message* msg = event.getData<Message>();
316 MessageProvider::sendMessageToReceivers( msg );
317 msg->dropPayload();
318// delete msg->getSourceAddress();
319 delete msg;
320 }
321
322 if ( event.getType() == TCPTransportTestEvent ) {
323
324 // add listener
325 addMessageReceiver( new MessageReceiver() );
326
327 // send message
328 //cout << "Sending message ..." << endl;
329 sendMessage( new TextMessage( "Hello World!" ) );
330 //cout << "Message sent ..." << endl;
331
332 }
333}
334
335NAMESPACE_END;
Note: See TracBrowser for help on using the repository browser.