source: source/ariba/utility/transport/tcpip/tcpip.cpp@ 9745

Last change on this file since 9745 was 9323, checked in by bless, 14 years ago
  • constructor now uses initializers
File size: 5.2 KB
RevLine 
[5284]1#include "tcpip.hpp"
2
3#define _NO_LOGGING
4
5// std includes
6#include <unistd.h>
7#include <iostream>
8#include <string>
9#include <sstream>
10#include <boost/foreach.hpp>
11
12// protlib includes
13#include "protlib/network_message.h"
14#include "protlib/tp_over_tcp.h"
15#include "protlib/tperror.h"
16#include "protlib/logfile.h"
17#include "protlib/queuemanager.h"
18#include "protlib/threadsafe_db.h"
19#include "protlib/setuid.h"
20
21// protlib namespaces
22using namespace protlib;
23using namespace protlib::log;
24
25logfile commonlog;
26protlib::log::logfile& protlib::log::DefaultLog(commonlog);
27
28namespace ariba {
29namespace transport {
30
31using namespace ariba::addressing;
32
33
34tcpip_endpoint convert( const appladdress* addr ) {
35 const char* ip_str = addr->get_ip_str();
36 tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
37 return endpoint;
38}
39
40appladdress convert( const tcpip_endpoint& endpoint ) {
41 tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
42 appladdress
43 peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
44// cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl;
45 return peer;
46}
47
[9323]48tcpip::tcpip( uint16_t port ) :
49 done ( false ),
50 running ( false ),
51 port( port ),
52 tpreceivethread ( NULL ),
53 tpthread ( NULL ),
54 listener ( NULL ) {
[5284]55}
56
57tcpip::~tcpip() {
[5718]58 if (running) stop();
[5284]59}
60
61bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
62 clen_bytes = m.decode32();
63 m.set_pos_r(-4);
64 return true;
65}
66
67void tcpip::start() {
68 done = false;
69 running = false;
70
71 // initalize netdb and setuid
72 protlib::tsdb::init();
73 protlib::setuid::init();
74
75 // set tcp parameters
76 port_t port = this->port; // port
77 TPoverTCPParam tppar(4, get_message_length, port);
78
79 // create receiver thread
80 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
81 QueueManager::instance()->register_queue(tpchecker_fq,
82 message::qaddr_signaling);
83
84 // start thread
85 pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
86 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
87 tpthread->start_processing();
88}
89
90void tcpip::stop() {
91 // stop receiver thread
92 done = true;
93
94 // stop TPoverTCP
95 tpthread->stop_processing();
96 tpthread->abort_processing(true);
97 tpthread->wait_until_stopped();
98
99 // unregister TPoverTCP
[8606]100 delete QueueManager::instance()->get_queue( message::qaddr_signaling );
[5284]101 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
102
103 // destroy QueueManager
104 QueueManager::clear();
105
106 // de-initalize netdb and setuid
107 protlib::setuid::end();
108 protlib::tsdb::end();
[8606]109
110 // wait for thread to finish and delete
111 pthread_join(tpreceivethread, NULL);
[5284]112}
113
114void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
115
116 // prepare netmsg with length and and port
117 NetMsg* datamsg = new NetMsg(size + 6);
118 datamsg->encode32( size + 2, true );
119 datamsg->encode16( this->port,true );
120
121 for (size_t i=0; i<size; i++)
122 datamsg->encode8( data[i],true );
123
124 // send message
125 tcpip_endpoint endpoint = *remote;
126 appladdress peer = convert(endpoint);
127
128 // add to output queue
129 tpthread->get_thread_object()->send( datamsg, peer, false );
130}
131
132void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
133 // send a message to each combination of ip-address and port
134 BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
135 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
136 tcpip_endpoint endpoint(ip,port);
137 address_vf vf = endpoint;
138 send(vf,data,size);
139 }
140 }
141}
142
[5614]143void tcpip::terminate( const address_v* remote) {
[5284]144 tcpip_endpoint endpoint = *remote;
145 appladdress peer = convert(endpoint);
146 peer.convert_to_ipv6();
147 tpthread->get_thread_object()->terminate( peer );
148}
149
150void tcpip::register_listener( transport_listener* listener ) {
151 this->listener = listener;
152}
153
154void* tcpip::receiverThread( void* ptp ) {
155 // get reference to transport object
156 tcpip& tp = *((tcpip*)ptp);
157
158 // get queue
159 FastQueue* fq =
160 QueueManager::instance()->get_queue(message::qaddr_signaling);
161
162 // main processing loop
163 tp.running = true;
164 while (!tp.done) {
165
166 // wait for new message to approach
167 message* msg = fq->dequeue_timedwait(300);
168
169 // message has arrived? no-> continue
170 if (!msg) continue;
171
172 // handle transport message
173 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
[5688]174 if (!tpmsg) {
[8606]175 delete msg;
[5688]176 continue;
177 }
[5284]178
179 // get address & message
180 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
181 const appladdress* local_peer = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
182 NetMsg* datamsg = tpmsg->get_message();
183
184 // not a data message? -> continue!
[5688]185 if (!datamsg) {
[8606]186 delete tpmsg;
[5688]187 continue;
188 }
[5284]189
190 // get length and remote endpoint port
191 datamsg->set_pos(0);
192 uint32_t message_size = datamsg->decode32(true)-2;
[6919]193 //uint16_t remote_port = datamsg->decode16(true);
[5284]194
195 // inform listener
196 if (tp.listener != NULL) {
197 tcpip_endpoint remote = convert(remote_peer);
198 tcpip_endpoint local = convert(local_peer);
199 tp.listener->receive_message(
200 &tp, local, remote, datamsg->get_buffer()+6, message_size );
[8609]201 }
[5444]202
[8609]203 tpmsg->set_message(NULL);
[8606]204 delete datamsg;
[5698]205 delete tpmsg;
[5284]206 }
207 // clean queue & stop
208 fq->cleanup();
209 tp.running = false;
210 return NULL;
211}
212
213}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.