// [Licence] // The Ariba-Underlay Copyright // // Copyright (c) 2008-2009, Institute of Telematics, Universität Karlsruhe (TH) // // Institute of Telematics // Universität Karlsruhe (TH) // Zirkel 2, 76128 Karlsruhe // Germany // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // 1. Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE INSTITUTE OF TELEMATICS ``AS IS'' AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // The views and conclusions contained in the software and documentation // are those of the authors and should not be interpreted as representing // official policies, either expressed or implied, of the Institute of // Telematics. // [Licence] #include "TCPTransport.h" #define _NO_LOGGING // std includes #include #include #include #include // protlib includes #include "../protlib/network_message.h" #include "../protlib/tp_over_tcp.h" #include "../protlib/logfile.h" #include "../protlib/queuemanager.h" #include "../protlib/threadsafe_db.h" #include "../protlib/setuid.h" // spovnet includes #include "ariba/utility/serialization.h" #include "ariba/utility/system/SystemQueue.h" #include "ariba/utility/system/SystemEvent.h" #include "ariba/utility/system/SystemEventType.h" #include "ariba/communication/modules/network/ip/IPv4Locator.h" // protlib namespaces using namespace protlib; using namespace protlib::log; // spovnet namespaces using ariba::utility::SystemQueue; using ariba::utility::SystemEvent; using ariba::utility::SystemEventType; using ariba::utility::MessageProvider; using ariba::utility::TextMessage; using ariba::utility::MessageReceiver; using ariba::communication::IPv4Locator; using_serialization; logfile commonlog; protlib::log::logfile& protlib::log::DefaultLog(commonlog); #include "ariba/communication/modules/_namespace.h" NAMESPACE_BEGIN; SystemEventType TCPTransportEvent("TCPTransport"); SystemEventType TCPMessageDispatchEvent("MessageDispatchEvent", TCPTransportEvent ); SystemEventType TCPTransportTestEvent("Test", TCPTransportEvent ); use_logging_cpp(TCPTransport); TCPTransport::TCPTransport( port_t port ) { this->running = false; this->done = false; this->port = port; logging_debug( "creating tcp transport module" ); } TCPTransport::~TCPTransport() { logging_debug( "deleting tcp transport module" ); } void TCPTransport::start() { logging_info( "starting tcp transport module ..." ); // initalize netdb and setuid protlib::tsdb::init(); protlib::setuid::init(); /* set tcp parameters */ port_t port = this->port; // port TPoverTCPParam tppar(4, TCPTransport::getMessageLength, port); /* create receiver thread */ FastQueue* tpchecker_fq = new FastQueue("TCPTransport", true); QueueManager::instance()->register_queue(tpchecker_fq, message::qaddr_signaling); /* start thread */ pthread_create( &tpreceivethread, NULL, TCPTransport::receiverThread, this ); tpthread = new ThreadStarter ( 1, tppar ); tpthread->start_processing(); logging_info( "tcp transport module started" ); } void TCPTransport::stop() { logging_info( "stopping tcp transport module ..." ); // stop receiver thread done = true; // stop TPoverTCP tpthread->stop_processing(); tpthread->abort_processing(true); tpthread->wait_until_stopped(); // unregister TPoverTCP QueueManager::instance()->unregister_queue( message::qaddr_signaling ); // destroy QueueManager QueueManager::clear(); // de-initalize netdb and setuid protlib::setuid::end(); protlib::tsdb::end(); logging_info( "tcp transport module stopped" ); } bool TCPTransport::getMessageLength( NetMsg& m, uint32& clen_bytes ) { clen_bytes = m.decode32(); m.set_pos_r(-4); return true; } void* TCPTransport::receiverThread( void* ptp ) { logging_info( "running tcp transport receiver thread" ); // get reference to transport object TCPTransport& tp = *((TCPTransport*)ptp); // get queue FastQueue* fq = QueueManager::instance()->get_queue(message::qaddr_signaling); // main processing loop tp.running = true; while (!tp.done) { // wait for new message to approach message* msg = fq->dequeue_timedwait(300); // handle message if (msg) { logging_debug( "Received incoming message" ); // handle transport message TPMsg* tpmsg = dynamic_cast (msg); if (tpmsg) { // evaluate TP message const appladdress* peer = static_cast( tpmsg->get_peeraddress() ); NetMsg* datamsg = tpmsg->get_message(); TPError* err = tpmsg->get_error(); // get data if (datamsg) { datamsg->set_pos(0); uint32_t msgLength = datamsg->decode32(true); uint16_t remotePort = datamsg->decode16(true); // convert data Data data( (uint8_t*)(datamsg->get_buffer()+6), (datamsg->get_size()-6)*8 ); // converting message logging_debug( "Converting message" ); Message* msg = new Message(data); std::ostringstream o; o << (peer->get_ip_str() + 7) << ":" << remotePort; msg->setSourceAddress( new IPv4Locator(IPv4Locator::fromString(o.str())) ); logging_debug( "> source address = " << o.str() ); logging_debug( "> message = " << msg->toString() ); // dispatching message logging_debug( "Dispatching message" ); SystemQueue::instance().scheduleEvent( SystemEvent( &tp, TCPMessageDispatchEvent, msg ) ); } // check error if (err) logging_error( "TCP transport error " + string(err->getstr()) ); logging_debug( "Message processed." ); tpmsg = NULL; } delete msg; } } // clean queue & stop fq->cleanup(); tp.running = false; return NULL; } seqnum_t TCPTransport::sendMessage(const Message* message ) { Data data = data_serialize( message ); const_cast(message)->dropPayload(); // prepare netmsg and send it NetMsg* datamsg = new NetMsg(data.getLength()/8+6); datamsg->encode32(data.getLength()/8+2, true); datamsg->encode16(this->port,true); for (int i=0; iencode8(data.getBuffer()[i],true); // send message const IPv4Locator* address = dynamic_cast(message->getDestinationAddress()); if( address == NULL) return 0; logging_debug( "sending message of size " << data.getLength() << " to address " + address->toString() << ": " + message->toString() ); string s = address->toString(); string::size_type i = s.find(':'); string ip = address->toString().substr(0,i).c_str(); logging_debug( "ip= " << ip << " port=" << address->getPort() ); appladdress peer(ip.c_str(), "tcp", address->getPort() ); tpthread->get_thread_object()->send(datamsg, peer, false); // release data data.release(); logging_debug( "message sent!" ); return 0; } void TCPTransport::terminate(const NetworkLocator* local, const NetworkLocator* remote){ const IPv4Locator* address = dynamic_cast(remote); if( address == NULL) return; string s = address->toString(); string::size_type i = s.find(':'); string ip = address->toString().substr(0,i).c_str(); appladdress peer( ip.c_str(), "tcp", address->getPort() ); peer.convert_to_ipv6(); tpthread->get_thread_object()->terminate( peer ); } TransportLocator::prot_t TCPTransport::getId() { return 6; // TCP } const vector TCPTransport::getLocators() { return vector(); } /* system event handler */ void TCPTransport::handleSystemEvent( const SystemEvent& event ) { // dispatch received messages if ( event.getType() == TCPMessageDispatchEvent ){ logging_debug( "forwarding message to local receivers" ); Message* msg = event.getData(); MessageProvider::sendMessageToReceivers( msg ); msg->dropPayload(); // delete msg->getSourceAddress(); delete msg; } if ( event.getType() == TCPTransportTestEvent ) { // add listener addMessageReceiver( new MessageReceiver() ); // send message //cout << "Sending message ..." << endl; sendMessage( new TextMessage( "Hello World!" ) ); //cout << "Message sent ..." << endl; } } NAMESPACE_END;