source: source/ariba/utility/transport/tcpip/tcpip.cpp@ 5639

Last change on this file since 5639 was 5614, checked in by mies, 15 years ago

added termination of transports when no link is up anymore to an end-point

File size: 5.1 KB
Line 
1#include "tcpip.hpp"
2
3#define _NO_LOGGING
4
5// std includes
6#include <unistd.h>
7#include <iostream>
8#include <string>
9#include <sstream>
10#include <boost/foreach.hpp>
11
12// protlib includes
13#include "protlib/network_message.h"
14#include "protlib/tp_over_tcp.h"
15#include "protlib/tperror.h"
16#include "protlib/logfile.h"
17#include "protlib/queuemanager.h"
18#include "protlib/threadsafe_db.h"
19#include "protlib/setuid.h"
20
21// protlib namespaces
22using namespace protlib;
23using namespace protlib::log;
24
25logfile commonlog;
26protlib::log::logfile& protlib::log::DefaultLog(commonlog);
27
28namespace ariba {
29namespace transport {
30
31using namespace ariba::addressing;
32
33
34tcpip_endpoint convert( const appladdress* addr ) {
35 const char* ip_str = addr->get_ip_str();
36 tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
37 return endpoint;
38}
39
40appladdress convert( const tcpip_endpoint& endpoint ) {
41 tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
42 appladdress
43 peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
44// cout << endpoint.to_string() << " to " << peer.get_ip_str() << ":" << peer.get_port() << endl;
45 return peer;
46}
47
48tcpip::tcpip( uint16_t port ) {
49 this->done = false;
50 this->running = false;
51 this->port = port;
52 this->listener = NULL;
53}
54
55tcpip::~tcpip() {
56
57}
58
59bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
60 clen_bytes = m.decode32();
61 m.set_pos_r(-4);
62 return true;
63}
64
65void tcpip::start() {
66 done = false;
67 running = false;
68
69 // initalize netdb and setuid
70 protlib::tsdb::init();
71 protlib::setuid::init();
72
73 // set tcp parameters
74 port_t port = this->port; // port
75 TPoverTCPParam tppar(4, get_message_length, port);
76
77 // create receiver thread
78 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
79 QueueManager::instance()->register_queue(tpchecker_fq,
80 message::qaddr_signaling);
81
82 // start thread
83 pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
84 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
85 tpthread->start_processing();
86}
87
88void tcpip::stop() {
89 // stop receiver thread
90 done = true;
91
92 // stop TPoverTCP
93 tpthread->stop_processing();
94 tpthread->abort_processing(true);
95 tpthread->wait_until_stopped();
96
97 // unregister TPoverTCP
98 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
99
100 // destroy QueueManager
101 QueueManager::clear();
102
103 // de-initalize netdb and setuid
104 protlib::setuid::end();
105 protlib::tsdb::end();
106}
107
108void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
109
110 // prepare netmsg with length and and port
111 NetMsg* datamsg = new NetMsg(size + 6);
112 datamsg->encode32( size + 2, true );
113 datamsg->encode16( this->port,true );
114
115 for (size_t i=0; i<size; i++)
116 datamsg->encode8( data[i],true );
117
118 // send message
119 tcpip_endpoint endpoint = *remote;
120 appladdress peer = convert(endpoint);
121
122 // add to output queue
123 tpthread->get_thread_object()->send( datamsg, peer, false );
124}
125
126void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
127 // send a message to each combination of ip-address and port
128 BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
129 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
130 tcpip_endpoint endpoint(ip,port);
131 address_vf vf = endpoint;
132 send(vf,data,size);
133 }
134 }
135}
136
137void tcpip::terminate( const address_v* remote) {
138 tcpip_endpoint endpoint = *remote;
139 appladdress peer = convert(endpoint);
140 peer.convert_to_ipv6();
141 tpthread->get_thread_object()->terminate( peer );
142}
143
144void tcpip::register_listener( transport_listener* listener ) {
145 this->listener = listener;
146}
147
148void* tcpip::receiverThread( void* ptp ) {
149 // get reference to transport object
150 tcpip& tp = *((tcpip*)ptp);
151
152 // get queue
153 FastQueue* fq =
154 QueueManager::instance()->get_queue(message::qaddr_signaling);
155
156 // main processing loop
157 tp.running = true;
158 while (!tp.done) {
159
160 // wait for new message to approach
161 message* msg = fq->dequeue_timedwait(300);
162
163 // message has arrived? no-> continue
164 if (!msg) continue;
165
166 // handle transport message
167 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
168 if (!tpmsg) continue;
169
170 // get address & message
171 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
172 const appladdress* local_peer = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
173 NetMsg* datamsg = tpmsg->get_message();
174
175 // not a data message? -> continue!
176 if (!datamsg) continue;
177
178 // get length and remote endpoint port
179 datamsg->set_pos(0);
180 uint32_t message_size = datamsg->decode32(true)-2;
181 uint16_t remote_port = datamsg->decode16(true);
182
183
184 // inform listener
185 if (tp.listener != NULL) {
186 tcpip_endpoint remote = convert(remote_peer);
187 tcpip_endpoint local = convert(local_peer);
188
189// DO NOT SET REMOTE PORT!
190// remote.port() = remote_port;
191// cout << "received: remote="
192// << remote.to_string()
193// << " local="
194// << local.to_string() << " size=" << message_size << endl;
195 tp.listener->receive_message(
196 &tp, local, remote, datamsg->get_buffer()+6, message_size );
197
198 delete datamsg;
199 }
200 }
201 // clean queue & stop
202 fq->cleanup();
203 tp.running = false;
204 return NULL;
205}
206
207}} // namespace ariba::transport
Note: See TracBrowser for help on using the repository browser.