00001 #include "tcpip.hpp"
00002
00003 #define _NO_LOGGING
00004
00005
00006 #include <unistd.h>
00007 #include <iostream>
00008 #include <string>
00009 #include <sstream>
00010 #include <boost/foreach.hpp>
00011
00012
00013 #include "protlib/network_message.h"
00014 #include "protlib/tp_over_tcp.h"
00015 #include "protlib/tperror.h"
00016 #include "protlib/logfile.h"
00017 #include "protlib/queuemanager.h"
00018 #include "protlib/threadsafe_db.h"
00019 #include "protlib/setuid.h"
00020
00021
00022 using namespace protlib;
00023 using namespace protlib::log;
00024
00025 logfile commonlog;
00026 protlib::log::logfile& protlib::log::DefaultLog(commonlog);
00027
00028 namespace ariba {
00029 namespace transport {
00030
00031 using namespace ariba::addressing;
00032
00033
00034 tcpip_endpoint convert( const appladdress* addr ) {
00035 const char* ip_str = addr->get_ip_str();
00036 tcpip_endpoint endpoint( std::string(ip_str), addr->get_port() );
00037 return endpoint;
00038 }
00039
00040 appladdress convert( const tcpip_endpoint& endpoint ) {
00041 tcpip_endpoint* e = const_cast<tcpip_endpoint*>(&endpoint);
00042 appladdress
00043 peer(e->address().to_string().c_str(), "tcp", e->port().asio() );
00044
00045 return peer;
00046 }
00047
00048 tcpip::tcpip( uint16_t port ) {
00049 this->done = false;
00050 this->running = false;
00051 this->port = port;
00052 this->listener = NULL;
00053 }
00054
00055 tcpip::~tcpip() {
00056 if (running) stop();
00057 }
00058
00059 bool get_message_length( NetMsg& m, uint32& clen_bytes ) {
00060 clen_bytes = m.decode32();
00061 m.set_pos_r(-4);
00062 return true;
00063 }
00064
00065 void tcpip::start() {
00066 done = false;
00067 running = false;
00068
00069
00070 protlib::tsdb::init();
00071 protlib::setuid::init();
00072
00073
00074 port_t port = this->port;
00075 TPoverTCPParam tppar(4, get_message_length, port);
00076
00077
00078 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
00079 QueueManager::instance()->register_queue(tpchecker_fq,
00080 message::qaddr_signaling);
00081
00082
00083 pthread_create( &tpreceivethread, NULL, tcpip::receiverThread, this );
00084 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
00085 tpthread->start_processing();
00086 }
00087
00088 void tcpip::stop() {
00089
00090 done = true;
00091
00092
00093 tpthread->stop_processing();
00094 tpthread->abort_processing(true);
00095 tpthread->wait_until_stopped();
00096
00097
00098 delete QueueManager::instance()->get_queue( message::qaddr_signaling );
00099 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
00100
00101
00102 QueueManager::clear();
00103
00104
00105 protlib::setuid::end();
00106 protlib::tsdb::end();
00107
00108
00109 pthread_join(tpreceivethread, NULL);
00110 }
00111
00112 void tcpip::send( const address_v* remote, const uint8_t* data, size_t size ) {
00113
00114
00115 NetMsg* datamsg = new NetMsg(size + 6);
00116 datamsg->encode32( size + 2, true );
00117 datamsg->encode16( this->port,true );
00118
00119 for (size_t i=0; i<size; i++)
00120 datamsg->encode8( data[i],true );
00121
00122
00123 tcpip_endpoint endpoint = *remote;
00124 appladdress peer = convert(endpoint);
00125
00126
00127 tpthread->get_thread_object()->send( datamsg, peer, false );
00128 }
00129
00130 void tcpip::send( const endpoint_set& endpoints, const uint8_t* data, size_t size ) {
00131
00132 BOOST_FOREACH( const ip_address ip, endpoints.ip ) {
00133 BOOST_FOREACH( const tcp_port_address port, endpoints.tcp ) {
00134 tcpip_endpoint endpoint(ip,port);
00135 address_vf vf = endpoint;
00136 send(vf,data,size);
00137 }
00138 }
00139 }
00140
00141 void tcpip::terminate( const address_v* remote) {
00142 tcpip_endpoint endpoint = *remote;
00143 appladdress peer = convert(endpoint);
00144 peer.convert_to_ipv6();
00145 tpthread->get_thread_object()->terminate( peer );
00146 }
00147
00148 void tcpip::register_listener( transport_listener* listener ) {
00149 this->listener = listener;
00150 }
00151
00152 void* tcpip::receiverThread( void* ptp ) {
00153
00154 tcpip& tp = *((tcpip*)ptp);
00155
00156
00157 FastQueue* fq =
00158 QueueManager::instance()->get_queue(message::qaddr_signaling);
00159
00160
00161 tp.running = true;
00162 while (!tp.done) {
00163
00164
00165 message* msg = fq->dequeue_timedwait(300);
00166
00167
00168 if (!msg) continue;
00169
00170
00171 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
00172 if (!tpmsg) {
00173 delete msg;
00174 continue;
00175 }
00176
00177
00178 const appladdress* remote_peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
00179 const appladdress* local_peer = static_cast<const appladdress*>( tpmsg->get_ownaddress() );
00180 NetMsg* datamsg = tpmsg->get_message();
00181
00182
00183 if (!datamsg) {
00184 delete tpmsg;
00185 continue;
00186 }
00187
00188
00189 datamsg->set_pos(0);
00190 uint32_t message_size = datamsg->decode32(true)-2;
00191
00192
00193
00194 if (tp.listener != NULL) {
00195 tcpip_endpoint remote = convert(remote_peer);
00196 tcpip_endpoint local = convert(local_peer);
00197 tp.listener->receive_message(
00198 &tp, local, remote, datamsg->get_buffer()+6, message_size );
00199 }
00200
00201 tpmsg->set_message(NULL);
00202 delete datamsg;
00203 delete tpmsg;
00204 }
00205
00206 fq->cleanup();
00207 tp.running = false;
00208 return NULL;
00209 }
00210
00211 }}