An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/tcpip.cpp @ 8609

Last change on this file since 8609 was 8609, checked in by Christoph Mayer, 13 years ago

-memleaks

File size: 5.1 KB
Line 
1#include "tcpip.hpp"
2
3#define _NO_LOGGING
4
5// std includes
6#include <unistd.h>
7#include <iostream>
8#include <string>
9#include <sstream>
10#include <boost/foreach.hpp>
11
12// protlib includes
13#include "protlib/network_message.h"
14#include "protlib/tp_over_tcp.h"
15#include "protlib/tperror.h"
16#include "protlib/logfile.h"
17#include "protlib/queuemanager.h"
18#include "protlib/threadsafe_db.h"
19#include "protlib/setuid.h"
20
21// protlib namespaces
22using namespace protlib;
23using namespace protlib::log;
24
25logfile commonlog;
26protlib::log::logfile& protlib::log::DefaultLog(commonlog);
27
28namespace ariba {
29namespace transport {
30
31using namespace ariba::addressing;
32
33
34tcpip_endpoint convert( const appladdress* addr ) {
35        const char* ip_str = addr->get_ip_str();
36        tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
37        return endpoint;
38}
39
40appladdress convert( const tcpip_endpoint& endpoint ) {
41        tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
42        appladdress
43                peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
44//      cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl;
45        return peer;
46}
47
48tcpip::tcpip( uint16_t port ) {
49        this->done = false;
50        this->running = false;
51        this->port = port;
52        this->listener = NULL;
53}
54
55tcpip::~tcpip() {
56        if (running) stop();
57}
58
59bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
60        clen_bytes = m.decode32();
61        m.set_pos_r(-4);
62        return true;
63}
64
65void tcpip::start() {
66        done = false;
67        running = false;
68
69        // initalize netdb and setuid
70        protlib::tsdb::init();
71        protlib::setuid::init();
72
73        // set tcp parameters
74        port_t port = this->port; // port
75        TPoverTCPParam tppar(4, get_message_length, port);
76
77        // create receiver thread
78        FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
79        QueueManager::instance()->register_queue(tpchecker_fq,
80                        message::qaddr_signaling);
81
82        // start thread
83        pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
84        tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
85        tpthread->start_processing();
86}
87
88void tcpip::stop() {
89        // stop receiver thread
90        done = true;
91
92        // stop TPoverTCP
93        tpthread->stop_processing();
94        tpthread->abort_processing(true);
95        tpthread->wait_until_stopped();
96
97        // unregister TPoverTCP
98        delete QueueManager::instance()->get_queue( message::qaddr_signaling );
99        QueueManager::instance()->unregister_queue( message::qaddr_signaling );
100
101        // destroy QueueManager
102        QueueManager::clear();
103
104        // de-initalize netdb and setuid
105        protlib::setuid::end();
106        protlib::tsdb::end();
107
108        // wait for thread to finish and delete
109        pthread_join(tpreceivethread, NULL);
110}
111
112void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
113
114        // prepare netmsg with length and and port
115        NetMsg* datamsg = new NetMsg(size + 6);
116        datamsg->encode32( size + 2,  true );
117        datamsg->encode16( this->port,true );
118
119        for (size_t i=0; i<size; i++)
120                datamsg->encode8( data[i],true );
121
122        // send message
123        tcpip_endpoint endpoint = *remote;
124        appladdress peer = convert(endpoint);
125
126        // add to output queue
127        tpthread->get_thread_object()->send( datamsg, peer, false );
128}
129
130void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
131        // send a message to each combination of ip-address and port
132        BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
133                BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
134                        tcpip_endpoint endpoint(ip,port);
135                        address_vf vf = endpoint;
136                        send(vf,data,size);
137                }
138        }
139}
140
141void tcpip::terminate( const address_v* remote) {
142        tcpip_endpoint endpoint = *remote;
143        appladdress peer = convert(endpoint);
144        peer.convert_to_ipv6();
145        tpthread->get_thread_object()->terminate( peer );
146}
147
148void tcpip::register_listener( transport_listener* listener ) {
149        this->listener = listener;
150}
151
152void* tcpip::receiverThread( void* ptp ) {
153        // get reference to transport object
154        tcpip& tp = *((tcpip*)ptp);
155
156        // get queue
157        FastQueue* fq =
158                QueueManager::instance()->get_queue(message::qaddr_signaling);
159
160        // main processing loop
161        tp.running = true;
162        while (!tp.done) {
163
164                // wait for new message to approach
165                message* msg = fq->dequeue_timedwait(300);
166
167                // message has arrived? no-> continue
168                if (!msg) continue;
169
170                // handle transport message
171                TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
172                if (!tpmsg) {
173                        delete msg;
174                        continue;
175                }
176
177                // get address & message
178                const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
179                const appladdress* local_peer  = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
180                NetMsg* datamsg = tpmsg->get_message();
181
182                // not a data message? -> continue!
183                if (!datamsg) {
184                        delete tpmsg;
185                        continue;
186                }
187
188                // get length and remote endpoint port
189                datamsg->set_pos(0);
190                uint32_t message_size = datamsg->decode32(true)-2;
191                //uint16_t remote_port = datamsg->decode16(true);
192
193                // inform listener
194                if (tp.listener != NULL) {
195                        tcpip_endpoint remote = convert(remote_peer);
196                        tcpip_endpoint local  = convert(local_peer);
197                        tp.listener->receive_message(
198                                        &tp, local, remote, datamsg->get_buffer()+6, message_size );
199                }
200
201                tpmsg->set_message(NULL);
202                delete datamsg;
203                delete tpmsg;
204        }
205        // clean queue & stop
206        fq->cleanup();
207        tp.running = false;
208        return NULL;
209}
210
211}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.