1 | // [License]
|
---|
2 | // The Ariba-Underlay Copyright
|
---|
3 | //
|
---|
4 | // Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
|
---|
5 | //
|
---|
6 | // Institute of Telematics
|
---|
7 | // UniversitÀt Karlsruhe (TH)
|
---|
8 | // Zirkel 2, 76128 Karlsruhe
|
---|
9 | // Germany
|
---|
10 | //
|
---|
11 | // Redistribution and use in source and binary forms, with or without
|
---|
12 | // modification, are permitted provided that the following conditions are
|
---|
13 | // met:
|
---|
14 | //
|
---|
15 | // 1. Redistributions of source code must retain the above copyright
|
---|
16 | // notice, this list of conditions and the following disclaimer.
|
---|
17 | // 2. Redistributions in binary form must reproduce the above copyright
|
---|
18 | // notice, this list of conditions and the following disclaimer in the
|
---|
19 | // documentation and/or other materials provided with the distribution.
|
---|
20 | //
|
---|
21 | // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
|
---|
22 | // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
---|
23 | // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
---|
24 | // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
|
---|
25 | // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
---|
26 | // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
---|
27 | // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
---|
28 | // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
---|
29 | // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
---|
30 | // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
---|
31 | // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
---|
32 | //
|
---|
33 | // The views and conclusions contained in the software and documentation
|
---|
34 | // are those of the authors and should not be interpreted as representing
|
---|
35 | // official policies, either expressed or implied, of the Institute of
|
---|
36 | // Telematics.
|
---|
37 | // [License]
|
---|
38 |
|
---|
39 | #include "TCPTransport.h"
|
---|
40 |
|
---|
41 | #define _NO_LOGGING
|
---|
42 |
|
---|
43 | // std includes
|
---|
44 | #include <unistd.h>
|
---|
45 | #include <iostream>
|
---|
46 | #include <string>
|
---|
47 | #include <sstream>
|
---|
48 |
|
---|
49 | // protlib includes
|
---|
50 | #include "../protlib/network_message.h"
|
---|
51 | #include "../protlib/tp_over_tcp.h"
|
---|
52 | #include "../protlib/logfile.h"
|
---|
53 | #include "../protlib/queuemanager.h"
|
---|
54 | #include "../protlib/threadsafe_db.h"
|
---|
55 | #include "../protlib/setuid.h"
|
---|
56 |
|
---|
57 | // spovnet includes
|
---|
58 | #include "ariba/utility/serialization.h"
|
---|
59 | #include "ariba/utility/system/SystemQueue.h"
|
---|
60 | #include "ariba/utility/system/SystemEvent.h"
|
---|
61 | #include "ariba/utility/system/SystemEventType.h"
|
---|
62 | #include "ariba/communication/modules/network/ip/IPv4Locator.h"
|
---|
63 |
|
---|
64 | // protlib namespaces
|
---|
65 | using namespace protlib;
|
---|
66 | using namespace protlib::log;
|
---|
67 |
|
---|
68 | // spovnet namespaces
|
---|
69 | using ariba::utility::SystemQueue;
|
---|
70 | using ariba::utility::SystemEvent;
|
---|
71 | using ariba::utility::SystemEventType;
|
---|
72 | using ariba::utility::MessageProvider;
|
---|
73 | using ariba::utility::TextMessage;
|
---|
74 | using ariba::utility::MessageReceiver;
|
---|
75 | using ariba::communication::IPv4Locator;
|
---|
76 |
|
---|
77 | using_serialization;
|
---|
78 |
|
---|
79 | logfile commonlog;
|
---|
80 | protlib::log::logfile& protlib::log::DefaultLog(commonlog);
|
---|
81 |
|
---|
82 | #include "ariba/communication/modules/_namespace.h"
|
---|
83 | NAMESPACE_BEGIN;
|
---|
84 |
|
---|
85 | SystemEventType TCPTransportEvent("TCPTransport");
|
---|
86 | SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent );
|
---|
87 | SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent );
|
---|
88 |
|
---|
89 | use_logging_cpp(TCPTransport);
|
---|
90 |
|
---|
91 | TCPTransport::TCPTransport( port_t port ) {
|
---|
92 | this->running = false;
|
---|
93 | this->done = false;
|
---|
94 | this->port = port;
|
---|
95 |
|
---|
96 | logging_debug( "creating tcp transport module" );
|
---|
97 | }
|
---|
98 |
|
---|
99 | TCPTransport::~TCPTransport() {
|
---|
100 | logging_debug( "deleting tcp transport module" );
|
---|
101 | }
|
---|
102 |
|
---|
103 | void TCPTransport::start() {
|
---|
104 |
|
---|
105 | logging_info( "starting tcp transport module ..." );
|
---|
106 |
|
---|
107 | // initalize netdb and setuid
|
---|
108 | protlib::tsdb::init();
|
---|
109 | protlib::setuid::init();
|
---|
110 |
|
---|
111 | /* set tcp parameters */
|
---|
112 | port_t port = this->port; // port
|
---|
113 | TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port);
|
---|
114 |
|
---|
115 | /* create receiver thread */
|
---|
116 | FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
|
---|
117 | QueueManager::instance()->register_queue(tpchecker_fq,
|
---|
118 | message::qaddr_signaling);
|
---|
119 |
|
---|
120 | /* start thread */
|
---|
121 | pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this );
|
---|
122 | tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
|
---|
123 | tpthread->start_processing();
|
---|
124 |
|
---|
125 | logging_info( "tcp transport module started" );
|
---|
126 | }
|
---|
127 |
|
---|
128 | void TCPTransport::stop() {
|
---|
129 |
|
---|
130 | logging_info( "stopping tcp transport module ..." );
|
---|
131 |
|
---|
132 | // stop receiver thread
|
---|
133 | done = true;
|
---|
134 |
|
---|
135 | // stop TPoverTCP
|
---|
136 | tpthread->stop_processing();
|
---|
137 | tpthread->abort_processing(true);
|
---|
138 | tpthread->wait_until_stopped();
|
---|
139 |
|
---|
140 | // unregister TPoverTCP
|
---|
141 | QueueManager::instance()->unregister_queue( message::qaddr_signaling );
|
---|
142 |
|
---|
143 | // destroy QueueManager
|
---|
144 | QueueManager::clear();
|
---|
145 |
|
---|
146 | // de-initalize netdb and setuid
|
---|
147 | protlib::setuid::end();
|
---|
148 | protlib::tsdb::end();
|
---|
149 |
|
---|
150 | logging_info( "tcp transport module stopped" );
|
---|
151 | }
|
---|
152 |
|
---|
153 | bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) {
|
---|
154 | clen_bytes = m.decode32();
|
---|
155 | m.set_pos_r(-4);
|
---|
156 | return true;
|
---|
157 | }
|
---|
158 |
|
---|
159 | void* TCPTransport::receiverThread( void* ptp ) {
|
---|
160 |
|
---|
161 | logging_info( "running tcp transport receiver thread" );
|
---|
162 |
|
---|
163 | // get reference to transport object
|
---|
164 | TCPTransport& tp = *((TCPTransport*)ptp);
|
---|
165 |
|
---|
166 | // get queue
|
---|
167 | FastQueue* fq =
|
---|
168 | QueueManager::instance()->get_queue(message::qaddr_signaling);
|
---|
169 |
|
---|
170 | // main processing loop
|
---|
171 | tp.running = true;
|
---|
172 | while (!tp.done) {
|
---|
173 |
|
---|
174 | // wait for new message to approach
|
---|
175 | message* msg = fq->dequeue_timedwait(300);
|
---|
176 |
|
---|
177 | // handle message
|
---|
178 | if (msg) {
|
---|
179 |
|
---|
180 | logging_debug( "Received incoming message" );
|
---|
181 |
|
---|
182 | // handle transport message
|
---|
183 | TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
|
---|
184 | if (tpmsg) {
|
---|
185 | // evaluate TP message
|
---|
186 | const appladdress* peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
|
---|
187 | NetMsg* datamsg = tpmsg->get_message();
|
---|
188 | TPError* err = tpmsg->get_error();
|
---|
189 |
|
---|
190 | // get data
|
---|
191 | if (datamsg) {
|
---|
192 |
|
---|
193 | datamsg->set_pos(0);
|
---|
194 | uint32_t msgLength = datamsg->decode32(true);
|
---|
195 | uint16_t remotePort = datamsg->decode16(true);
|
---|
196 |
|
---|
197 | // convert data
|
---|
198 | Data data(
|
---|
199 | (uint8_t*)(datamsg->get_buffer()+6),
|
---|
200 | (datamsg->get_size()-6)*8
|
---|
201 | );
|
---|
202 |
|
---|
203 | // converting message
|
---|
204 | logging_debug( "converting message" );
|
---|
205 | //std::cout << "XXXXXXXXXXXXreceived data on tcp transport: " << data << std::endl;
|
---|
206 | Message* msg = new Message(data);
|
---|
207 |
|
---|
208 | std::ostringstream o;
|
---|
209 | o << (peer->get_ip_str() + 7) << ":" << remotePort;
|
---|
210 | msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
|
---|
211 | logging_debug( "> source address = " << o.str() );
|
---|
212 | logging_debug( "> message = " << msg->toString() );
|
---|
213 |
|
---|
214 | // dispatching message
|
---|
215 | logging_debug( "Dispatching message" );
|
---|
216 | SystemQueue::instance().scheduleEvent(
|
---|
217 | SystemEvent( &tp, TCPMessageDispatchEvent, msg )
|
---|
218 | );
|
---|
219 | }
|
---|
220 |
|
---|
221 | // check error
|
---|
222 | if (err)
|
---|
223 | logging_error( "TCP transport error " + string(err->getstr()) );
|
---|
224 |
|
---|
225 | logging_debug( "Message processed." );
|
---|
226 |
|
---|
227 | tpmsg = NULL;
|
---|
228 | }
|
---|
229 | delete msg;
|
---|
230 | }
|
---|
231 | }
|
---|
232 |
|
---|
233 | // clean queue & stop
|
---|
234 | fq->cleanup();
|
---|
235 | tp.running = false;
|
---|
236 | return NULL;
|
---|
237 | }
|
---|
238 |
|
---|
239 | seqnum_t TCPTransport::sendMessage(const Message* message ) {
|
---|
240 |
|
---|
241 | Data data = data_serialize( message );
|
---|
242 | //std::cout << "XXXXXXXXXXXXXsending out data using tcp transport: " << data << std::endl;
|
---|
243 |
|
---|
244 | const_cast<Message*>(message)->dropPayload();
|
---|
245 |
|
---|
246 | // prepare netmsg and send it
|
---|
247 | NetMsg* datamsg = new NetMsg(data.getLength()/8+6);
|
---|
248 | datamsg->encode32(data.getLength()/8+2, true);
|
---|
249 | datamsg->encode16(this->port,true);
|
---|
250 | for (int i=0; i<data.getLength()/8; i++)
|
---|
251 | datamsg->encode8(data.getBuffer()[i],true);
|
---|
252 |
|
---|
253 | // send message
|
---|
254 | const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
|
---|
255 | if( address == NULL) return 0;
|
---|
256 |
|
---|
257 | logging_debug( "sending message of size " << data.getLength() <<
|
---|
258 | " to address " + address->toString() <<
|
---|
259 | ": " + message->toString() );
|
---|
260 |
|
---|
261 | string s = address->toString();
|
---|
262 | string::size_type i = s.find(':');
|
---|
263 | string ip = address->toString().substr(0,i).c_str();
|
---|
264 | logging_debug( "ip= " << ip << " port=" << address->getPort() );
|
---|
265 |
|
---|
266 | appladdress peer(ip.c_str(), "tcp", address->getPort() );
|
---|
267 | tpthread->get_thread_object()->send(datamsg, peer, false);
|
---|
268 |
|
---|
269 | // release data
|
---|
270 | data.release();
|
---|
271 | logging_debug( "message sent!" );
|
---|
272 |
|
---|
273 | return 0;
|
---|
274 | }
|
---|
275 |
|
---|
276 | void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){
|
---|
277 |
|
---|
278 | const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(remote);
|
---|
279 | if( address == NULL) return;
|
---|
280 |
|
---|
281 | string s = address->toString();
|
---|
282 | string::size_type i = s.find(':');
|
---|
283 | string ip = address->toString().substr(0,i).c_str();
|
---|
284 |
|
---|
285 | appladdress peer( ip.c_str(), "tcp", address->getPort() );
|
---|
286 | peer.convert_to_ipv6();
|
---|
287 |
|
---|
288 | tpthread->get_thread_object()->terminate( peer );
|
---|
289 | }
|
---|
290 |
|
---|
291 | TransportLocator::prot_t TCPTransport::getId() {
|
---|
292 | return 6; // TCP
|
---|
293 | }
|
---|
294 |
|
---|
295 |
|
---|
296 |
|
---|
297 | const vector<TransportLocator*> TCPTransport::getLocators() {
|
---|
298 | return vector<TransportLocator*>();
|
---|
299 | }
|
---|
300 |
|
---|
301 | /* system event handler */
|
---|
302 | void TCPTransport::handleSystemEvent( const SystemEvent& event ) {
|
---|
303 |
|
---|
304 | // dispatch received messages
|
---|
305 | if ( event.getType() == TCPMessageDispatchEvent ){
|
---|
306 | logging_debug( "forwarding message to local receivers" );
|
---|
307 | Message* msg = event.getData<Message>();
|
---|
308 | MessageProvider::sendMessageToReceivers( msg );
|
---|
309 | msg->dropPayload();
|
---|
310 | // delete msg->getSourceAddress();
|
---|
311 | delete msg;
|
---|
312 | }
|
---|
313 |
|
---|
314 | if ( event.getType() == TCPTransportTestEvent ) {
|
---|
315 |
|
---|
316 | // add listener
|
---|
317 | addMessageReceiver( new MessageReceiver() );
|
---|
318 |
|
---|
319 | // send message
|
---|
320 | //cout << "Sending message ..." << endl;
|
---|
321 | sendMessage( new TextMessage( "Hello World!" ) );
|
---|
322 | //cout << "Message sent ..." << endl;
|
---|
323 |
|
---|
324 | }
|
---|
325 | }
|
---|
326 |
|
---|
327 | NAMESPACE_END;
|
---|