source: source/ariba/utility/transport/tcpip/tcpip.cpp@ 9320

Last change on this file since 9320 was 8609, checked in by Christoph Mayer, 14 years ago

-memleaks

File size: 5.1 KB
Line 
1#include "tcpip.hpp"
2
3#define _NO_LOGGING
4
5// std includes
6#include <unistd.h>
7#include <iostream>
8#include <string>
9#include <sstream>
10#include <boost/foreach.hpp>
11
12// protlib includes
13#include "protlib/network_message.h"
14#include "protlib/tp_over_tcp.h"
15#include "protlib/tperror.h"
16#include "protlib/logfile.h"
17#include "protlib/queuemanager.h"
18#include "protlib/threadsafe_db.h"
19#include "protlib/setuid.h"
20
21// protlib namespaces
22using namespace protlib;
23using namespace protlib::log;
24
25logfile commonlog;
26protlib::log::logfile& protlib::log::DefaultLog(commonlog);
27
28namespace ariba {
29namespace transport {
30
31using namespace ariba::addressing;
32
33
34tcpip_endpoint convert( const appladdress* addr ) {
35 const char* ip_str = addr->get_ip_str();
36 tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
37 return endpoint;
38}
39
40appladdress convert( const tcpip_endpoint& endpoint ) {
41 tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
42 appladdress
43 peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
44// cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl;
45 return peer;
46}
47
48tcpip::tcpip( uint16_t port ) {
49 this->done = false;
50 this->running = false;
51 this->port = port;
52 this->listener = NULL;
53}
54
55tcpip::~tcpip() {
56 if (running) stop();
57}
58
59bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
60 clen_bytes = m.decode32();
61 m.set_pos_r(-4);
62 return true;
63}
64
65void tcpip::start() {
66 done = false;
67 running = false;
68
69 // initalize netdb and setuid
70 protlib::tsdb::init();
71 protlib::setuid::init();
72
73 // set tcp parameters
74 port_t port = this->port; // port
75 TPoverTCPParam tppar(4, get_message_length, port);
76
77 // create receiver thread
78 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
79 QueueManager::instance()->register_queue(tpchecker_fq,
80 message::qaddr_signaling);
81
82 // start thread
83 pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
84 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
85 tpthread->start_processing();
86}
87
88void tcpip::stop() {
89 // stop receiver thread
90 done = true;
91
92 // stop TPoverTCP
93 tpthread->stop_processing();
94 tpthread->abort_processing(true);
95 tpthread->wait_until_stopped();
96
97 // unregister TPoverTCP
98 delete QueueManager::instance()->get_queue( message::qaddr_signaling );
99 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
100
101 // destroy QueueManager
102 QueueManager::clear();
103
104 // de-initalize netdb and setuid
105 protlib::setuid::end();
106 protlib::tsdb::end();
107
108 // wait for thread to finish and delete
109 pthread_join(tpreceivethread, NULL);
110}
111
112void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
113
114 // prepare netmsg with length and and port
115 NetMsg* datamsg = new NetMsg(size + 6);
116 datamsg->encode32( size + 2, true );
117 datamsg->encode16( this->port,true );
118
119 for (size_t i=0; i<size; i++)
120 datamsg->encode8( data[i],true );
121
122 // send message
123 tcpip_endpoint endpoint = *remote;
124 appladdress peer = convert(endpoint);
125
126 // add to output queue
127 tpthread->get_thread_object()->send( datamsg, peer, false );
128}
129
130void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
131 // send a message to each combination of ip-address and port
132 BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
133 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
134 tcpip_endpoint endpoint(ip,port);
135 address_vf vf = endpoint;
136 send(vf,data,size);
137 }
138 }
139}
140
141void tcpip::terminate( const address_v* remote) {
142 tcpip_endpoint endpoint = *remote;
143 appladdress peer = convert(endpoint);
144 peer.convert_to_ipv6();
145 tpthread->get_thread_object()->terminate( peer );
146}
147
148void tcpip::register_listener( transport_listener* listener ) {
149 this->listener = listener;
150}
151
152void* tcpip::receiverThread( void* ptp ) {
153 // get reference to transport object
154 tcpip& tp = *((tcpip*)ptp);
155
156 // get queue
157 FastQueue* fq =
158 QueueManager::instance()->get_queue(message::qaddr_signaling);
159
160 // main processing loop
161 tp.running = true;
162 while (!tp.done) {
163
164 // wait for new message to approach
165 message* msg = fq->dequeue_timedwait(300);
166
167 // message has arrived? no-> continue
168 if (!msg) continue;
169
170 // handle transport message
171 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
172 if (!tpmsg) {
173 delete msg;
174 continue;
175 }
176
177 // get address & message
178 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
179 const appladdress* local_peer = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
180 NetMsg* datamsg = tpmsg->get_message();
181
182 // not a data message? -> continue!
183 if (!datamsg) {
184 delete tpmsg;
185 continue;
186 }
187
188 // get length and remote endpoint port
189 datamsg->set_pos(0);
190 uint32_t message_size = datamsg->decode32(true)-2;
191 //uint16_t remote_port = datamsg->decode16(true);
192
193 // inform listener
194 if (tp.listener != NULL) {
195 tcpip_endpoint remote = convert(remote_peer);
196 tcpip_endpoint local = convert(local_peer);
197 tp.listener->receive_message(
198 &tp, local, remote, datamsg->get_buffer()+6, message_size );
199 }
200
201 tpmsg->set_message(NULL);
202 delete datamsg;
203 delete tpmsg;
204 }
205 // clean queue & stop
206 fq->cleanup();
207 tp.running = false;
208 return NULL;
209}
210
211}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.