00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "TCPTransport.h"
00040
00041 #define _NO_LOGGING
00042
00043
00044 #include <unistd.h>
00045 #include <iostream>
00046 #include <string>
00047 #include <sstream>
00048
00049
00050 #include "../protlib/network_message.h"
00051 #include "../protlib/tp_over_tcp.h"
00052 #include "../protlib/logfile.h"
00053 #include "../protlib/queuemanager.h"
00054 #include "../protlib/threadsafe_db.h"
00055 #include "../protlib/setuid.h"
00056
00057
00058 #include "ariba/utility/serialization.h"
00059 #include "ariba/utility/system/SystemQueue.h"
00060 #include "ariba/utility/system/SystemEvent.h"
00061 #include "ariba/utility/system/SystemEventType.h"
00062 #include "ariba/communication/modules/network/ip/IPv4Locator.h"
00063
00064
00065 using namespace protlib;
00066 using namespace protlib::log;
00067
00068
00069 using ariba::utility::SystemQueue;
00070 using ariba::utility::SystemEvent;
00071 using ariba::utility::SystemEventType;
00072 using ariba::utility::MessageProvider;
00073 using ariba::utility::TextMessage;
00074 using ariba::utility::MessageReceiver;
00075 using ariba::communication::IPv4Locator;
00076
00077 using_serialization;
00078
00079 logfile commonlog;
00080 protlib::log::logfile& protlib::log::DefaultLog(commonlog);
00081
00082 #include "ariba/communication/modules/_namespace.h"
00083 NAMESPACE_BEGIN;
00084
00085 SystemEventType TCPTransportEvent("TCPTransport");
00086 SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent );
00087 SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent );
00088
00089 use_logging_cpp(TCPTransport);
00090
00091 TCPTransport::TCPTransport( port_t port ) {
00092 this->running = false;
00093 this->done = false;
00094 this->port = port;
00095
00096 logging_debug( "creating tcp transport module" );
00097 }
00098
00099 TCPTransport::~TCPTransport() {
00100 logging_debug( "deleting tcp transport module" );
00101 }
00102
00103 void TCPTransport::start() {
00104
00105 logging_info( "starting tcp transport module ..." );
00106
00107
00108 protlib::tsdb::init();
00109 protlib::setuid::init();
00110
00111
00112 port_t port = this->port;
00113 TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port);
00114
00115
00116 FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true);
00117 QueueManager::instance()->register_queue(tpchecker_fq,
00118 message::qaddr_signaling);
00119
00120
00121 pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this );
00122 tpthread = new ThreadStarter<TPoverTCP, TPoverTCPParam> ( 1, tppar );
00123 tpthread->start_processing();
00124
00125 logging_info( "tcp transport module started" );
00126 }
00127
00128 void TCPTransport::stop() {
00129
00130 logging_info( "stopping tcp transport module ..." );
00131
00132
00133 done = true;
00134
00135
00136 tpthread->stop_processing();
00137 tpthread->abort_processing(true);
00138 tpthread->wait_until_stopped();
00139
00140
00141 QueueManager::instance()->unregister_queue( message::qaddr_signaling );
00142
00143
00144 QueueManager::clear();
00145
00146
00147 protlib::setuid::end();
00148 protlib::tsdb::end();
00149
00150 logging_info( "tcp transport module stopped" );
00151 }
00152
00153 bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) {
00154 clen_bytes = m.decode32();
00155 m.set_pos_r(-4);
00156 return true;
00157 }
00158
00159 void* TCPTransport::receiverThread( void* ptp ) {
00160
00161 logging_info( "running tcp transport receiver thread" );
00162
00163
00164 TCPTransport& tp = *((TCPTransport*)ptp);
00165
00166
00167 FastQueue* fq =
00168 QueueManager::instance()->get_queue(message::qaddr_signaling);
00169
00170
00171 tp.running = true;
00172 while (!tp.done) {
00173
00174
00175 message* msg = fq->dequeue_timedwait(300);
00176
00177
00178 if (msg) {
00179
00180 logging_debug( "Received incoming message" );
00181
00182
00183 TPMsg* tpmsg = dynamic_cast<TPMsg*> (msg);
00184 if (tpmsg) {
00185
00186 const appladdress* peer = static_cast<const appladdress*>( tpmsg->get_peeraddress() );
00187 NetMsg* datamsg = tpmsg->get_message();
00188 TPError* err = tpmsg->get_error();
00189
00190
00191 if (datamsg) {
00192
00193 datamsg->set_pos(0);
00194 uint32_t msgLength = datamsg->decode32(true);
00195 uint16_t remotePort = datamsg->decode16(true);
00196
00197
00198 Data data(
00199 (uint8_t*)(datamsg->get_buffer()+6),
00200 (datamsg->get_size()-6)*8
00201 );
00202
00203
00204 logging_debug( "Converting message" );
00205 Message* msg = new Message(data);
00206 std::ostringstream o;
00207 o << (peer->get_ip_str() + 7) << ":" << remotePort;
00208 msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) );
00209 logging_debug( "> source address = " << o.str() );
00210 logging_debug( "> message = " << msg->toString() );
00211
00212
00213 logging_debug( "Dispatching message" );
00214 SystemQueue::instance().scheduleEvent(
00215 SystemEvent( &tp, TCPMessageDispatchEvent, msg )
00216 );
00217 }
00218
00219
00220 if (err)
00221 logging_error( "TCP transport error " + string(err->getstr()) );
00222
00223 logging_debug( "Message processed." );
00224
00225 tpmsg = NULL;
00226 }
00227 delete msg;
00228 }
00229 }
00230
00231
00232 fq->cleanup();
00233 tp.running = false;
00234 return NULL;
00235 }
00236
00237 seqnum_t TCPTransport::sendMessage(const Message* message ) {
00238
00239 Data data = data_serialize( message );
00240 const_cast<Message*>(message)->dropPayload();
00241
00242
00243 NetMsg* datamsg = new NetMsg(data.getLength()/8+6);
00244 datamsg->encode32(data.getLength()/8+2, true);
00245 datamsg->encode16(this->port,true);
00246 for (int i=0; i<data.getLength()/8; i++)
00247 datamsg->encode8(data.getBuffer()[i],true);
00248
00249
00250 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(message->getDestinationAddress());
00251 if( address == NULL) return 0;
00252
00253 logging_debug( "sending message of size " << data.getLength() <<
00254 " to address " + address->toString() <<
00255 ": " + message->toString() );
00256
00257 string s = address->toString();
00258 string::size_type i = s.find(':');
00259 string ip = address->toString().substr(0,i).c_str();
00260 logging_debug( "ip= " << ip << " port=" << address->getPort() );
00261
00262 appladdress peer(ip.c_str(), "tcp", address->getPort() );
00263 tpthread->get_thread_object()->send(datamsg, peer, false);
00264
00265
00266 data.release();
00267 logging_debug( "message sent!" );
00268
00269 return 0;
00270 }
00271
00272 void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){
00273
00274 const IPv4Locator* address = dynamic_cast<const IPv4Locator*>(remote);
00275 if( address == NULL) return;
00276
00277 string s = address->toString();
00278 string::size_type i = s.find(':');
00279 string ip = address->toString().substr(0,i).c_str();
00280
00281 appladdress peer( ip.c_str(), "tcp", address->getPort() );
00282 peer.convert_to_ipv6();
00283
00284 tpthread->get_thread_object()->terminate( peer );
00285 }
00286
00287 TransportLocator::prot_t TCPTransport::getId() {
00288 return 6;
00289 }
00290
00291
00292
00293 const vector<TransportLocator*> TCPTransport::getLocators() {
00294 return vector<TransportLocator*>();
00295 }
00296
00297
00298 void TCPTransport::handleSystemEvent( const SystemEvent& event ) {
00299
00300
00301 if ( event.getType() == TCPMessageDispatchEvent ){
00302 logging_debug( "forwarding message to local receivers" );
00303 Message* msg = event.getData<Message>();
00304 MessageProvider::sendMessageToReceivers( msg );
00305 msg->dropPayload();
00306
00307 delete msg;
00308 }
00309
00310 if ( event.getType() == TCPTransportTestEvent ) {
00311
00312
00313 addMessageReceiver( new MessageReceiver() );
00314
00315
00316
00317 sendMessage( new TextMessage( "Hello World!" ) );
00318
00319
00320 }
00321 }
00322
00323 NAMESPACE_END;