An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/communication/modules/transport/tcp/TCPTransport.cpp @ 3690

Last change on this file since 3690 was 3690, checked in by mies, 14 years ago

Merged 20090512-mies-connectors changes r3472:r3689 into trunk.

File size: 9.0 KB
Line 
1// [License]
2// The Ariba-Underlay Copyright
3//
4// Copyright (c) 2008-2009, Institute of Telematics, UniversitÀt Karlsruhe (TH)
5//
6// Institute of Telematics
7// UniversitÀt Karlsruhe (TH)
8// Zirkel 2, 76128 Karlsruhe
9// Germany
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR
25// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32//
33// The views and conclusions contained in the software and documentation
34// are those of the authors and should not be interpreted as representing
35// official policies, either expressed or implied, of the Institute of
36// Telematics.
37// [License]
38
39#include "TCPTransport.h"
40
41#define _NO_LOGGING
42
43// std includes
44#include <unistd.h>
45#include <iostream>
46#include <string>
47#include <sstream>
48
49// protlib includes
50#include "../protlib/network_message.h"
51#include "../protlib/tp_over_tcp.h"
52#include "../protlib/logfile.h"
53#include "../protlib/queuemanager.h"
54#include "../protlib/threadsafe_db.h"
55#include "../protlib/setuid.h"
56
57// spovnet includes
58#include "ariba/utility/serialization.h"
59#include "ariba/utility/system/SystemQueue.h"
60#include "ariba/utility/system/SystemEvent.h"
61#include "ariba/utility/system/SystemEventType.h"
62#include "ariba/communication/modules/network/ip/IPv4Locator.h"
63
64// protlib namespaces
65using namespace protlib;
66using namespace protlib::log;
67
68// spovnet namespaces
69using ariba::utility::SystemQueue;
70using ariba::utility::SystemEvent;
71using ariba::utility::SystemEventType;
72using ariba::utility::MessageProvider;
73using ariba::utility::TextMessage;
74using ariba::utility::MessageReceiver;
75using ariba::communication::IPv4Locator;
76
77using_serialization;
78
79logfile commonlog;
80protlib::log::logfile& protlib::log::DefaultLog(commonlog);
81
82#include "ariba/communication/modules/_namespace.h"
83NAMESPACE_BEGIN;
84
85SystemEventType TCPTransportEvent("TCPTransport");
86SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent );
87SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent );
88
89use_logging_cpp(TCPTransport);
90
91TCPTransport::TCPTransport( port_t port ) {
92        this->running = false;
93        this->done = false;
94        this->port = port;
95
96        logging_debug( "creating tcp transport module" );
97}
98
99TCPTransport::~TCPTransport() {
100        logging_debug( "deleting tcp transport module" );
101}
102
103void TCPTransport::start() {
104
105        logging_info( "starting tcp transport module ..." );
106
107        // initalize netdb and setuid
108        protlib::tsdb::init();
109        protlib::setuid::init();
110
111        /* set tcp parameters */
112        port_t port = this->port; // port
113        TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port);
114
115        /* create receiver thread */
116        FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
117        QueueManager::instance()->register_queue(tpchecker_fq,
118                        message::qaddr_signaling);
119
120        /* start thread */
121        pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this );
122        tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
123        tpthread->start_processing();
124
125        logging_info( "tcp transport module started" );
126}
127
128void TCPTransport::stop() {
129
130        logging_info( "stopping tcp transport module ..." );
131
132        // stop receiver thread
133        done = true;
134
135        // stop TPoverTCP
136        tpthread->stop_processing();
137        tpthread->abort_processing(true);
138        tpthread->wait_until_stopped();
139
140        // unregister TPoverTCP
141        QueueManager::instance()->unregister_queue( message::qaddr_signaling );
142
143        // destroy QueueManager
144        QueueManager::clear();
145
146        // de-initalize netdb and setuid
147        protlib::setuid::end();
148        protlib::tsdb::end();
149
150        logging_info( "tcp transport module stopped" );
151}
152
153bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) {
154        clen_bytes = m.decode32();
155        m.set_pos_r(-4);
156        return true;
157}
158
159void* TCPTransport::receiverThread( void* ptp ) {
160
161        logging_info( "running tcp transport receiver thread" );
162
163        // get reference to transport object
164        TCPTransport& tp = *((TCPTransport*)ptp);
165
166        // get queue
167        FastQueue* fq =
168                QueueManager::instance()->get_queue(message::qaddr_signaling);
169
170        // main processing loop
171        tp.running = true;
172        while (!tp.done) {
173
174                // wait for new message to approach
175                message* msg = fq->dequeue_timedwait(300);
176
177                // handle message
178                if (msg) {
179
180                        logging_debug( "Received incoming message" );
181
182                        // handle transport message
183                        TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
184                        if (tpmsg) {
185                                // evaluate TP message
186                                const appladdress* peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
187                                NetMsg* datamsg = tpmsg->get_message();
188                                TPError* err = tpmsg->get_error();
189
190                                // get data
191                                if (datamsg) {
192
193                                        datamsg->set_pos(0);
194                                        uint32_t msgLength = datamsg->decode32(true);
195                                        uint16_t remotePort = datamsg->decode16(true);
196
197                                        // convert data
198                                        Data data(
199                                                (uint8_t*)(datamsg->get_buffer()+6),
200                                                (datamsg->get_size()-6)*8
201                                        );
202
203                                        // converting message
204                                        logging_debug( "Converting message" );
205                                        Message* msg = new Message(data);
206                                        std::ostringstream o;
207                                        o << (peer->get_ip_str() + 7) << ":" << remotePort;
208                                        msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
209                                        logging_debug( "> source address = " << o.str() );
210                                        logging_debug( "> message = " << msg->toString() );
211
212                                        // dispatching message
213                                        logging_debug( "Dispatching message" );
214                                        SystemQueue::instance().scheduleEvent(
215                                                SystemEvent( &tp, TCPMessageDispatchEvent, msg )
216                                        );
217                                }
218
219                                // check error
220                                if (err)
221                                        logging_error( "TCP transport error " + string(err->getstr()) );
222
223                                logging_debug( "Message processed." );
224
225                                tpmsg = NULL;
226                        }
227                        delete msg;
228                }
229        }
230
231        // clean queue & stop
232        fq->cleanup();
233        tp.running = false;
234        return NULL;
235}
236
237seqnum_t TCPTransport::sendMessage(const Message* message ) {
238
239        Data data = data_serialize( message );
240        const_cast<Message*>(message)->dropPayload();
241
242        // prepare netmsg and send it
243        NetMsg* datamsg = new NetMsg(data.getLength()/8+6);
244        datamsg->encode32(data.getLength()/8+2, true);
245        datamsg->encode16(this->port,true);
246        for (int i=0; i<data.getLength()/8; i++)
247                datamsg->encode8(data.getBuffer()[i],true);
248
249        // send message
250        const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
251        if( address == NULL) return 0;
252
253        logging_debug( "sending message of size " << data.getLength() <<
254                        " to address " + address->toString() <<
255                        ": " + message->toString() );
256
257        string s = address->toString();
258        string::size_type i = s.find(':');
259        string ip = address->toString().substr(0,i).c_str();
260        logging_debug( "ip= " << ip << " port=" << address->getPort() );
261
262        appladdress peer(ip.c_str(), "tcp", address->getPort() );
263        tpthread->get_thread_object()->send(datamsg, peer, false);
264
265        // release data
266        data.release();
267        logging_debug( "message sent!" );
268
269        return 0;
270}
271
272void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){
273
274        const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(remote);
275        if( address == NULL) return;
276
277        string s = address->toString();
278        string::size_type i = s.find(':');
279        string ip = address->toString().substr(0,i).c_str();
280
281        appladdress peer( ip.c_str(), "tcp", address->getPort() );
282        peer.convert_to_ipv6();
283
284        tpthread->get_thread_object()->terminate( peer );
285}
286
287TransportLocator::prot_t TCPTransport::getId() {
288        return 6; // TCP
289}
290
291
292
293const vector<TransportLocator*> TCPTransport::getLocators() {
294        return vector<TransportLocator*>();
295}
296
297/* system event handler */
298void TCPTransport::handleSystemEvent( const SystemEvent& event ) {
299
300        // dispatch received messages
301        if ( event.getType() == TCPMessageDispatchEvent ){
302                logging_debug( "forwarding message to local receivers" );
303                Message* msg = event.getData<Message>();
304                MessageProvider::sendMessageToReceivers( msg );
305                msg->dropPayload();
306//              delete msg->getSourceAddress();
307                delete msg;
308        }
309
310        if ( event.getType() == TCPTransportTestEvent ) {
311
312                // add listener
313                addMessageReceiver( new MessageReceiver() );
314
315                // send message
316                //cout << "Sending message ..." << endl;
317                sendMessage( new TextMessage( "Hello World!" ) );
318                //cout << "Message sent ..." << endl;
319
320        }
321}
322
323NAMESPACE_END;
Note: See TracBrowser for help on using the repository browser.