close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/communication/modules/transport/tcp/TCPTransport.cpp@ 5403

Last change on this file since 5403 was 5151, checked in by Christoph Mayer, 15 years ago

begin merge back from relay branch

File size: 9.2 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "TCPTransport.h"
40
41#define _NO_LOGGING
42
43// std includes
44#include <unistd.h>
45#include <iostream>
46#include <string>
47#include <sstream>
48
49// protlib includes
50#include "../protlib/network_message.h"
51#include "../protlib/tp_over_tcp.h"
52#include "../protlib/logfile.h"
53#include "../protlib/queuemanager.h"
54#include "../protlib/threadsafe_db.h"
55#include "../protlib/setuid.h"
56
57// spovnet includes
58#include "ariba/utility/serialization.h"
59#include "ariba/utility/system/SystemQueue.h"
60#include "ariba/utility/system/SystemEvent.h"
61#include "ariba/utility/system/SystemEventType.h"
62#include "ariba/communication/modules/network/ip/IPv4Locator.h"
63
64// protlib namespaces
65using namespace protlib;
66using namespace protlib::log;
67
68// spovnet namespaces
69using ariba::utility::SystemQueue;
70using ariba::utility::SystemEvent;
71using ariba::utility::SystemEventType;
72using ariba::utility::MessageProvider;
73using ariba::utility::TextMessage;
74using ariba::utility::MessageReceiver;
75using ariba::communication::IPv4Locator;
76
77using_serialization;
78
79logfile commonlog;
80protlib::log::logfile& protlib::log::DefaultLog(commonlog);
81
82#include "ariba/communication/modules/_namespace.h"
83NAMESPACE_BEGIN;
84
85SystemEventType TCPTransportEvent("TCPTransport");
86SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent );
87SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent );
88
89use_logging_cpp(TCPTransport);
90
91TCPTransport::TCPTransport( port_t port ) {
92 this->running = false;
93 this->done = false;
94 this->port = port;
95
96 logging_debug( "creating tcp transport module" );
97}
98
99TCPTransport::~TCPTransport() {
100 logging_debug( "deleting tcp transport module" );
101}
102
103void TCPTransport::start() {
104
105 logging_info( "starting tcp transport module ..." );
106
107 // initalize netdb and setuid
108 protlib::tsdb::init();
109 protlib::setuid::init();
110
111 /* set tcp parameters */
112 port_t port = this->port; // port
113 TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port);
114
115 /* create receiver thread */
116 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
117 QueueManager::instance()->register_queue(tpchecker_fq,
118 message::qaddr_signaling);
119
120 /* start thread */
121 pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this );
122 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
123 tpthread->start_processing();
124
125 logging_info( "tcp transport module started" );
126}
127
128void TCPTransport::stop() {
129
130 logging_info( "stopping tcp transport module ..." );
131
132 // stop receiver thread
133 done = true;
134
135 // stop TPoverTCP
136 tpthread->stop_processing();
137 tpthread->abort_processing(true);
138 tpthread->wait_until_stopped();
139
140 // unregister TPoverTCP
141 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
142
143 // destroy QueueManager
144 QueueManager::clear();
145
146 // de-initalize netdb and setuid
147 protlib::setuid::end();
148 protlib::tsdb::end();
149
150 logging_info( "tcp transport module stopped" );
151}
152
153bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) {
154 clen_bytes = m.decode32();
155 m.set_pos_r(-4);
156 return true;
157}
158
159void* TCPTransport::receiverThread( void* ptp ) {
160
161 logging_info( "running tcp transport receiver thread" );
162
163 // get reference to transport object
164 TCPTransport& tp = *((TCPTransport*)ptp);
165
166 // get queue
167 FastQueue* fq =
168 QueueManager::instance()->get_queue(message::qaddr_signaling);
169
170 // main processing loop
171 tp.running = true;
172 while (!tp.done) {
173
174 // wait for new message to approach
175 message* msg = fq->dequeue_timedwait(300);
176
177 // handle message
178 if (msg) {
179
180 logging_debug( "Received incoming message" );
181
182 // handle transport message
183 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
184 if (tpmsg) {
185 // evaluate TP message
186 const appladdress* peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
187 NetMsg* datamsg = tpmsg->get_message();
188 TPError* err = tpmsg->get_error();
189
190 // get data
191 if (datamsg) {
192
193 datamsg->set_pos(0);
194 uint32_t msgLength = datamsg->decode32(true);
195 uint16_t remotePort = datamsg->decode16(true);
196
197 // convert data
198 Data data(
199 (uint8_t*)(datamsg->get_buffer()+6),
200 (datamsg->get_size()-6)*8
201 );
202
203 // converting message
204 logging_debug( "converting message" );
205 //std::cout << "XXXXXXXXXXXXreceived data on tcp transport: " << data << std::endl;
206 Message* msg = new Message(data);
207
208 std::ostringstream o;
209 o << (peer->get_ip_str() + 7) << ":" << remotePort;
210 msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
211 logging_debug( "> source address = " << o.str() );
212 logging_debug( "> message = " << msg->toString() );
213
214 // dispatching message
215 logging_debug( "Dispatching message" );
216 SystemQueue::instance().scheduleEvent(
217 SystemEvent( &tp, TCPMessageDispatchEvent, msg )
218 );
219 }
220
221 // check error
222 if (err)
223 logging_error( "TCP transport error " + string(err->getstr()) );
224
225 logging_debug( "Message processed." );
226
227 tpmsg = NULL;
228 }
229 delete msg;
230 }
231 }
232
233 // clean queue & stop
234 fq->cleanup();
235 tp.running = false;
236 return NULL;
237}
238
239seqnum_t TCPTransport::sendMessage(const Message* message ) {
240
241 Data data = data_serialize( message );
242 //std::cout << "XXXXXXXXXXXXXsending out data using tcp transport: " << data << std::endl;
243
244
245 // prepare netmsg and send it
246 NetMsg* datamsg = new NetMsg(data.getLength()/8+6);
247 datamsg->encode32(data.getLength()/8+2, true);
248 datamsg->encode16(this->port,true);
249 for (int i=0; i<data.getLength()/8; i++)
250 datamsg->encode8(data.getBuffer()[i],true);
251
252 // send message
253 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
254 if( address == NULL) return 0;
255
256 logging_debug( "sending message of size " << data.getLength() <<
257 " to address " + address->toString() <<
258 ": " + message->toString() );
259 const_cast<Message*>(message)->dropPayload();
260
261 string s = address->toString();
262 string::size_type i = s.find(':');
263 string ip = address->toString().substr(0,i).c_str();
264 logging_debug( "ip= " << ip << " port=" << address->getPort() );
265
266 appladdress peer(ip.c_str(), "tcp", address->getPort() );
267 tpthread->get_thread_object()->send(datamsg, peer, false);
268
269 // release data
270 data.release();
271 logging_debug( "message sent!" );
272
273 return 0;
274}
275
276void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){
277
278 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(remote);
279 if( address == NULL) return;
280
281 string s = address->toString();
282 string::size_type i = s.find(':');
283 string ip = address->toString().substr(0,i).c_str();
284
285 appladdress peer( ip.c_str(), "tcp", address->getPort() );
286 peer.convert_to_ipv6();
287
288 tpthread->get_thread_object()->terminate( peer );
289}
290
291TransportLocator::prot_t TCPTransport::getId() {
292 return 6; // TCP
293}
294
295
296
297const vector<TransportLocator*> TCPTransport::getLocators() {
298 return vector<TransportLocator*>();
299}
300
301/* system event handler */
302void TCPTransport::handleSystemEvent( const SystemEvent& event ) {
303
304 // dispatch received messages
305 if ( event.getType() == TCPMessageDispatchEvent ){
306 logging_debug( "forwarding message to local receivers" );
307 Message* msg = event.getData<Message>();
308 MessageProvider::sendMessageToReceivers( msg );
309 msg->dropPayload();
310// delete msg->getSourceAddress();
311 delete msg;
312 }
313
314 if ( event.getType() == TCPTransportTestEvent ) {
315
316 // add listener
317 addMessageReceiver( new MessageReceiver() );
318
319 // send message
320 //cout << "Sending message ..." << endl;
321 sendMessage( new TextMessage( "Hello World!" ) );
322 //cout << "Message sent ..." << endl;
323
324 }
325}
326
327NAMESPACE_END;
Note: See TracBrowser for help on using the repository browser.