close Warning: Can't use blame annotator:
No changeset 2259 in the repository

source: source/ariba/communication/modules/transport/tcp/TCPTransport.cpp@ 4591

Last change on this file since 4591 was 3690, checked in by mies, 16 years ago

Merged 20090512-mies-connectors changes r3472:r3689 into trunk.

File size: 9.0 KB
RevLine 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "TCPTransport.h"
40
41#define _NO_LOGGING
42
43// std includes
44#include <unistd.h>
45#include <iostream>
46#include <string>
47#include <sstream>
48
49// protlib includes
50#include "../protlib/network_message.h"
51#include "../protlib/tp_over_tcp.h"
52#include "../protlib/logfile.h"
53#include "../protlib/queuemanager.h"
54#include "../protlib/threadsafe_db.h"
55#include "../protlib/setuid.h"
56
57// spovnet includes
58#include "ariba/utility/serialization.h"
59#include "ariba/utility/system/SystemQueue.h"
60#include "ariba/utility/system/SystemEvent.h"
61#include "ariba/utility/system/SystemEventType.h"
62#include "ariba/communication/modules/network/ip/IPv4Locator.h"
63
64// protlib namespaces
65using namespace protlib;
66using namespace protlib::log;
67
68// spovnet namespaces
69using ariba::utility::SystemQueue;
70using ariba::utility::SystemEvent;
71using ariba::utility::SystemEventType;
72using ariba::utility::MessageProvider;
73using ariba::utility::TextMessage;
74using ariba::utility::MessageReceiver;
75using ariba::communication::IPv4Locator;
76
77using_serialization;
78
79logfile commonlog;
80protlib::log::logfile& protlib::log::DefaultLog(commonlog);
81
82#include "ariba/communication/modules/_namespace.h"
83NAMESPACE_BEGIN;
84
85SystemEventType TCPTransportEvent("TCPTransport");
86SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent );
87SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent );
88
89use_logging_cpp(TCPTransport);
90
91TCPTransport::TCPTransport( port_t port ) {
92 this->running = false;
93 this->done = false;
94 this->port = port;
95
96 logging_debug( "creating tcp transport module" );
97}
98
99TCPTransport::~TCPTransport() {
100 logging_debug( "deleting tcp transport module" );
101}
102
103void TCPTransport::start() {
104
105 logging_info( "starting tcp transport module ..." );
106
107 // initalize netdb and setuid
108 protlib::tsdb::init();
109 protlib::setuid::init();
110
111 /* set tcp parameters */
112 port_t port = this->port; // port
113 TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port);
114
115 /* create receiver thread */
116 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
117 QueueManager::instance()->register_queue(tpchecker_fq,
118 message::qaddr_signaling);
119
120 /* start thread */
121 pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this );
122 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
123 tpthread->start_processing();
124
125 logging_info( "tcp transport module started" );
126}
127
128void TCPTransport::stop() {
129
130 logging_info( "stopping tcp transport module ..." );
131
132 // stop receiver thread
133 done = true;
134
135 // stop TPoverTCP
136 tpthread->stop_processing();
137 tpthread->abort_processing(true);
138 tpthread->wait_until_stopped();
139
140 // unregister TPoverTCP
141 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
142
143 // destroy QueueManager
144 QueueManager::clear();
145
146 // de-initalize netdb and setuid
147 protlib::setuid::end();
148 protlib::tsdb::end();
149
150 logging_info( "tcp transport module stopped" );
151}
152
153bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) {
154 clen_bytes = m.decode32();
155 m.set_pos_r(-4);
156 return true;
157}
158
159void* TCPTransport::receiverThread( void* ptp ) {
160
161 logging_info( "running tcp transport receiver thread" );
162
163 // get reference to transport object
164 TCPTransport& tp = *((TCPTransport*)ptp);
165
166 // get queue
167 FastQueue* fq =
168 QueueManager::instance()->get_queue(message::qaddr_signaling);
169
170 // main processing loop
171 tp.running = true;
172 while (!tp.done) {
173
174 // wait for new message to approach
175 message* msg = fq->dequeue_timedwait(300);
176
177 // handle message
178 if (msg) {
179
180 logging_debug( "Received incoming message" );
181
182 // handle transport message
183 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
184 if (tpmsg) {
185 // evaluate TP message
186 const appladdress* peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
187 NetMsg* datamsg = tpmsg->get_message();
188 TPError* err = tpmsg->get_error();
189
190 // get data
191 if (datamsg) {
192
193 datamsg->set_pos(0);
194 uint32_t msgLength = datamsg->decode32(true);
195 uint16_t remotePort = datamsg->decode16(true);
196
197 // convert data
198 Data data(
199 (uint8_t*)(datamsg->get_buffer()+6),
200 (datamsg->get_size()-6)*8
201 );
202
203 // converting message
204 logging_debug( "Converting message" );
205 Message* msg = new Message(data);
206 std::ostringstream o;
207 o << (peer->get_ip_str() + 7) << ":" << remotePort;
208 msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
209 logging_debug( "> source address = " << o.str() );
210 logging_debug( "> message = " << msg->toString() );
211
212 // dispatching message
213 logging_debug( "Dispatching message" );
214 SystemQueue::instance().scheduleEvent(
215 SystemEvent( &tp, TCPMessageDispatchEvent, msg )
216 );
217 }
218
219 // check error
220 if (err)
221 logging_error( "TCP transport error " + string(err->getstr()) );
222
223 logging_debug( "Message processed." );
224
225 tpmsg = NULL;
226 }
227 delete msg;
228 }
229 }
230
231 // clean queue & stop
232 fq->cleanup();
233 tp.running = false;
234 return NULL;
235}
236
237seqnum_t TCPTransport::sendMessage(const Message* message ) {
238
239 Data data = data_serialize( message );
240 const_cast<Message*>(message)->dropPayload();
241
242 // prepare netmsg and send it
243 NetMsg* datamsg = new NetMsg(data.getLength()/8+6);
244 datamsg->encode32(data.getLength()/8+2, true);
245 datamsg->encode16(this->port,true);
246 for (int i=0; i<data.getLength()/8; i++)
247 datamsg->encode8(data.getBuffer()[i],true);
248
249 // send message
250 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
251 if( address == NULL) return 0;
252
253 logging_debug( "sending message of size " << data.getLength() <<
254 " to address " + address->toString() <<
255 ": " + message->toString() );
256
257 string s = address->toString();
258 string::size_type i = s.find(':');
259 string ip = address->toString().substr(0,i).c_str();
260 logging_debug( "ip= " << ip << " port=" << address->getPort() );
261
262 appladdress peer(ip.c_str(), "tcp", address->getPort() );
263 tpthread->get_thread_object()->send(datamsg, peer, false);
264
265 // release data
266 data.release();
267 logging_debug( "message sent!" );
268
269 return 0;
270}
271
272void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){
273
274 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(remote);
275 if( address == NULL) return;
276
277 string s = address->toString();
278 string::size_type i = s.find(':');
279 string ip = address->toString().substr(0,i).c_str();
280
281 appladdress peer( ip.c_str(), "tcp", address->getPort() );
282 peer.convert_to_ipv6();
283
284 tpthread->get_thread_object()->terminate( peer );
285}
286
287TransportLocator::prot_t TCPTransport::getId() {
288 return 6; // TCP
289}
290
291
292
293const vector<TransportLocator*> TCPTransport::getLocators() {
294 return vector<TransportLocator*>();
295}
296
297/* system event handler */
298void TCPTransport::handleSystemEvent( const SystemEvent& event ) {
299
300 // dispatch received messages
301 if ( event.getType() == TCPMessageDispatchEvent ){
302 logging_debug( "forwarding message to local receivers" );
303 Message* msg = event.getData<Message>();
304 MessageProvider::sendMessageToReceivers( msg );
305 msg->dropPayload();
306// delete msg->getSourceAddress();
307 delete msg;
308 }
309
310 if ( event.getType() == TCPTransportTestEvent ) {
311
312 // add listener
313 addMessageReceiver( new MessageReceiver() );
314
315 // send message
316 //cout << "Sending message ..." << endl;
317 sendMessage( new TextMessage( "Hello World!" ) );
318 //cout << "Message sent ..." << endl;
319
320 }
321}
322
323NAMESPACE_END;
Note: See TracBrowser for help on using the repository browser.