source: source/ariba/communication/modules/transport/protlib/tp_over_tls_tcp.cpp@ 4987

Last change on this file since 4987 was 4987, checked in by Christoph Mayer, 15 years ago

isrunning function für timer

File size: 57.2 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tls_tcp.cpp
3/// TCP/TLS-based transport module
4/// ----------------------------------------------------------
5/// $Id: tp_over_tls_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tls_tcp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tls_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63namespace protlib {
64
65using namespace log;
66
67/** @defgroup tptcp TP over TCP
68 * @ingroup network
69 * @{
70 */
71
72char in6_addrstr_tls[INET6_ADDRSTRLEN+1];
73
74
75/******* class TPoverTLS_TCP *******/
76
77/*
78 * Obtain reason string for last SSL error
79 *
80 * Some caution is needed here since ERR_reason_error_string will
81 * return NULL if it doesn't recognize the error code. We don't
82 * want to return NULL ever.
83 */
84const char *
85TPoverTLS_TCP::SSLerrmessage(void)
86{
87 unsigned long errcode;
88 const char *errreason;
89 static char errbuf[32];
90
91 errcode = ERR_get_error();
92 if (errcode == 0)
93 return "No SSL error reported";
94 errreason = ERR_reason_error_string(errcode);
95 if (errreason != NULL)
96 return errreason;
97 snprintf(errbuf, sizeof(errbuf), "SSL error code %lu", errcode);
98 return errbuf;
99}
100
101
102
103/** get_connection_to() checks for already existing connections.
104 * If a connection exists, it returns "AssocData"
105 * and saves it in "connmap" for further use
106 * If no connection exists, a new connection to "addr"
107 * is created.
108 */
109AssocData*
110TPoverTLS_TCP::get_connection_to(const appladdress& addr)
111{
112 // get timeout
113 struct timespec ts;
114 get_time_of_day(ts);
115 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
116 if (ts.tv_nsec>=1000000000L)
117 {
118 ts.tv_sec += ts.tv_nsec / 1000000000L;
119 ts.tv_nsec= ts.tv_nsec % 1000000000L;
120 }
121
122 // create a new AssocData pointer, initialize it to NULL
123 AssocData* assoc= NULL;
124 int new_socket;
125 // loop until timeout is exceeded: TODO: check applicability of loop
126 do
127 {
128 // check for existing connections to addr
129 // start critical section
130 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
131 assoc= connmap.lookup(addr);
132 // end critical section
133 unlock(); // uninstall_cleanup(1);
134 if (assoc)
135 {
136 // If not shut down then use it, else abort, wait and check again.
137 if (!assoc->shutdown)
138 {
139 return assoc;
140 }
141 else
142 {
143 // TODO: connection is already in shutdown mode. What now?
144 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"socket exists, but is already in mode shutdown");
145
146 return 0;
147 }
148 } //end __if (assoc)__
149 else
150 {
151 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
152 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
153 }
154
155 // no connection found, create a new one
156 new_socket = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
157
158 if (new_socket == -1)
159 {
160 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"Couldn't create a new socket: " << strerror(errno));
161
162 return 0;
163 }
164
165 // Disable Nagle Algorithm, set (TCP_NODELAY)
166 int nodelayflag= 1;
167 int status= setsockopt(new_socket,
168 IPPROTO_TCP,
169 TCP_NODELAY,
170 &nodelayflag,
171 sizeof(nodelayflag));
172 if (status)
173 {
174 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
175 }
176
177 // Reuse ports, even if they are busy
178 int socketreuseflag= 1;
179 status= setsockopt(new_socket,
180 SOL_SOCKET,
181 SO_REUSEADDR,
182 (const char *) &socketreuseflag,
183 sizeof(socketreuseflag));
184 if (status)
185 {
186 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
187 }
188
189 struct sockaddr_in6 dest_address;
190 dest_address.sin6_flowinfo= 0;
191 dest_address.sin6_scope_id= 0;
192 addr.get_sockaddr(dest_address);
193
194 // connect the socket to the destination address
195 int connect_status = connect(new_socket,
196 reinterpret_cast<const struct sockaddr*>(&dest_address),
197 sizeof(dest_address));
198
199
200 // connects to the listening_port of the peer's masterthread
201 if (connect_status != 0)
202 {
203 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
204 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
205 // FIXME: that doesn't work gracefully
206 //throw TPError(TPError::ERROR_UNREACHABLE);
207 return 0;
208 }
209
210 struct sockaddr_in6 own_address;
211 socklen_t own_address_len= sizeof(own_address);
212 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
213
214
215 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
216 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr_tls,INET6_ADDRSTRLEN)
217 << " port #" << ntohs(own_address.sin6_port));
218
219 // create new AssocData record (will copy addr)
220 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
221
222 DLog(tpparam.name, "Before creation of a new SSL object");
223
224 SSL *ssl = NULL;
225
226 if (!ssl_ctx) ERRCLog(tpparam.name, "Could not create SSL context");
227
228 ssl = SSL_new(ssl_ctx);
229
230 if (!ssl)
231 {
232 ERRCLog(tpparam.name,"could not create SSL context: " << SSLerrmessage());
233 }
234
235 if (!ssl)
236 ERRCLog(tpparam.name, "Could not create SSL object");
237
238 DLog(tpparam.name, "Before binding it to the socket");
239
240 SSL_set_fd(ssl, new_socket);
241
242 DLog(tpparam.name, "Bound SSL object to the socket");
243
244
245 // Connect to the server
246 if ((SSL_connect(ssl)) < 1) {
247 ERRCLog(tpparam.name, "SSL connect failed in get_connection_to(): " << SSLerrmessage());
248 }
249
250 sslmap[new_socket]=ssl;
251
252 DLog(tpparam.name, "Inserted SSL object into sslmap");
253
254 // if assoc could be successfully created, insert it into ConnectionMap
255 if (assoc)
256 {
257 bool insert_success= false;
258 // start critical section
259 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
260 // insert AssocData into connection map
261 insert_success= connmap.insert(assoc);
262 // end critical section
263 unlock(); // uninstall_cleanup(1);
264
265 if (insert_success == true)
266 {
267#ifdef _DEBUG
268 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
269 << " via socket " << new_socket);
270
271
272#endif
273
274 // notify master thread to start a receiver thread (i.e. send selfmessage)
275 TPoverTLS_TCPMsg* newmsg= new(nothrow)TPoverTLS_TCPMsg(assoc, tpparam.source, TPoverTLS_TCPMsg::start);
276 if (newmsg)
277 {
278 newmsg->send_to(tpparam.source);
279 return assoc;
280 }
281 else
282 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"get_connection_to: could not get memory for internal msg");
283 }
284 else
285 {
286 // delete data and abort
287 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
288 <<", port #" << addr.get_port() << " into connection map, aborting connection");
289
290 // abort connection, delete its AssocData
291 close (new_socket);
292 if (assoc)
293 {
294 delete assoc;
295 assoc= 0;
296 }
297 return assoc;
298 } // end else connmap.insert
299
300 } // end "if (assoc)"
301 }
302 while (wait_cond(ts)!=ETIMEDOUT);
303
304 return assoc;
305} //end get_connection_to
306
307
308/** terminates a signaling association/connection
309 * - if no connection exists, generate a warning
310 * - otherwise: generate internal msg to related receiver thread
311 */
312void
313TPoverTLS_TCP::terminate(const address& in_addr)
314{
315
316 appladdress* addr = NULL;
317 addr= dynamic_cast<appladdress*>(in_addr.copy());
318
319 if (!addr) return;
320
321#ifndef _NO_LOGGING
322 const char *const thisproc="terminate() - ";
323#endif
324
325 // Create a new AssocData-pointer
326 AssocData* assoc = NULL;
327
328 // check for existing connections to addr
329
330 // start critical section:
331 // please note if receiver_thread terminated already, the assoc data is not
332 // available anymore therefore we need a lock around cleanup_receiver_thread()
333
334 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
335 assoc= connmap.lookup(*addr);
336 if (assoc)
337 {
338 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"got request to shutdown connection for peer " << *addr);
339 // If not shut down then use it, else abort, wait and check again.
340 if (!assoc->shutdown)
341 {
342 if (assoc->socketfd)
343 {
344 // disallow sending
345 if (shutdown(assoc->socketfd,SHUT_WR))
346 {
347 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,thisproc<<"shutdown (write) on socket for peer " << *addr << " returned error:" << strerror(errno));
348 }
349 else
350 {
351 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"initiated closing of connection for peer " << *addr << ". Shutdown (write) on socket "<< assoc->socketfd );
352 }
353 }
354 assoc->shutdown= true;
355 }
356 else
357 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"connection for peer " << *addr << " is already in mode shutdown");
358
359 }
360 else
361 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
362
363 stop_receiver_thread(assoc);
364
365 // end critical section
366 unlock(); // uninstall_cleanup(1);
367
368
369 if (addr) delete addr;
370
371}
372
373
374/** generates and internal TPoverTLS_TCP message to send a NetMsg to the network
375 *
376 * - it is necessary to let a thread do this, because the caller
377 * may get blocked if the connect() or send() call hangs for a while
378 * - the sending thread will call TPoverTLS_TCP::tcpsend()
379 * - if no connection exists, creates a new one
380 * @note the netmsg is deleted by the send() method when it is not used anymore
381 */
382void
383TPoverTLS_TCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
384{
385 if (netmsg == NULL) {
386 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
387 return;
388 }
389
390 appladdress* addr = NULL;
391 addr = dynamic_cast<appladdress*>(in_addr.copy());
392
393 if (!addr)
394 {
395 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type: " << (int) in_addr.get_type());
396 return;
397 }
398
399 // lock due to sendermap access
400 lock();
401
402 // check for existing sender thread
403 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
404
405 FastQueue* destqueue= 0;
406
407 if (it == senderthread_queuemap.end())
408 { // no sender thread so far
409
410 // if we already have an existing connection it is save to create a sender thread
411 // since get_connection_to() will not be invoked, so an existing connection will
412 // be used
413 const AssocData* assoc = connmap.lookup(*addr);
414
415 //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
416
417 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
418 { // it is allowed to create a new connection for this thread
419 // create a new queue for sender thread
420 FastQueue* sender_thread_queue= new FastQueue;
421 create_new_sender_thread(sender_thread_queue);
422 // remember queue for later use
423
424 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
425 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
426
427 destqueue= sender_thread_queue;
428 }
429 }
430 else
431 { // we have a sender thread, use stored queue from map
432 destqueue= it->second;
433 }
434
435 unlock();
436
437 // send a send_data message to it (if we have a destination queue)
438 if (destqueue)
439 {
440 // both parameters will be freed after message was sent!
441
442 TPoverTLS_TCPMsg* internalmsg= new TPoverTLS_TCPMsg(netmsg,new appladdress(*addr));
443 if (internalmsg)
444 {
445 // send the internal message to the sender thread queue
446 internalmsg->send(tpparam.source,destqueue);
447 }
448 }
449 else
450 {
451 if (!use_existing_connection)
452 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
453 else
454 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
455
456 // cannot send data, so we must drop it
457 delete netmsg;
458
459 }
460
461 if (addr) delete addr;
462}
463
464/** sends a NetMsg to the network.
465 *
466 * @param netmsg message to send
467 * @param addr transport endpoint address
468 *
469 * @note if no connection exists, creates a new one
470 * @note both parameters are deleted after the message was sent
471 */
472void
473TPoverTLS_TCP::tcptlssend(NetMsg* netmsg, appladdress* addr)
474{
475#ifndef _NO_LOGGING
476 const char *const thisproc="sender - ";
477#endif
478
479 // set result initially to success, set it to failure
480 // in places where these failures occur
481 int result = TCP_SUCCESS;
482 int ret= 0;
483
484 // Create a new AssocData-pointer
485 AssocData* assoc = NULL;
486
487 // tp.cpp checks for initialisation of tp and correctness of
488 // msgsize, protocol and ip,
489 // throws an error if something is not right
490 if (addr) {
491 addr->convert_to_ipv6();
492 check_send_args(*netmsg,*addr);
493
494 } else {
495 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "address pointer is NULL");
496 result= TCP_SEND_FAILURE;
497 throw TPErrorInternal();
498 }
499
500
501
502 // check for existing connections,
503 // if a connection exists, return its AssocData
504 // and save it in assoc for further use
505 // if no connection exists, create a new one (in get_connection_to()).
506 // Connection is inserted into connection map that must be done
507 // with exclusive access
508 assoc= get_connection_to(*addr);
509
510 if (assoc==NULL || assoc->socketfd<=0)
511 {
512 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "no valid assoc/socket data");
513
514 delete netmsg;
515 delete addr;
516 return;
517 }
518 if (assoc->shutdown)
519 {
520 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
521 delete netmsg;
522 delete addr;
523
524 throw TPErrorSendFailed();
525 }
526
527 uint32 msgsize= netmsg->get_size();
528#ifdef DEBUG_HARD
529 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
530#endif
531
532
533 // get SSL connection data
534 SSL* ssl= sslmap[assoc->socketfd];
535
536
537 // send all the data contained in netmsg to the socket
538 // which belongs to the address "addr"
539 for (uint32 bytes_sent= 0;
540 bytes_sent < msgsize;
541 bytes_sent+= ret)
542 {
543
544#ifdef _DEBUG_HARD
545 for (uint32 i=0;i<msgsize;i++)
546 {
547 cout << "send_buf: " << i << " : ";
548 if ( isalnum(*(netmsg->get_buffer()+i)) )
549 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
550 else
551 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
552 cout << endl;
553 }
554
555 cout << endl;
556 cout << "bytes_sent: " << bytes_sent << endl;
557 cout << "Message size: " << msgsize << endl;
558 cout << "Send-Socket: " << assoc->socketfd << endl;
559 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
560 cout << "vor send " << endl;
561#endif
562
563
564 // socket send
565 //ret= ::send(assoc->socketfd,
566// netmsg->get_buffer() + bytes_sent,
567// msgsize - bytes_sent,
568// MSG_NOSIGNAL);
569
570
571 ret = SSL_write(ssl, netmsg->get_buffer() + bytes_sent, msgsize - bytes_sent);
572
573 //int SSL_write(SSL *ssl, const void *buf, int num);
574
575 //send_buf + bytes_sent
576
577 if (debug_pdu)
578 {
579 ostringstream hexdump;
580 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
581 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
582 }
583
584 if (ret < 0)
585 {
586 result= TCP_SEND_FAILURE;
587 break;
588 } // end if (ret < 0)
589 } // end for
590
591 // *** note: netmsg is deleted here ***
592 delete netmsg;
593
594 // Throwing an exception within a critical section does not
595 // unlock the mutex.
596
597 if (result != TCP_SUCCESS)
598 {
599 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "TLS/TCP error, returns " << ret << ", error : " << strerror(errno));
600 delete addr;
601
602 throw TPErrorSendFailed();
603
604 }
605 else
606 Log(EVENT_LOG,LOG_NORMAL,tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
607
608 if (!assoc) {
609 // no connection
610
611 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
612 << ", port #" << addr->get_port());
613
614 delete addr;
615
616 throw TPErrorUnreachable(); // should be no assoc found
617 } // end "if (!assoc)"
618
619 // *** delete address ***
620 delete addr;
621}
622
623/* this thread waits for an internal message that either:
624 * - requests transmission of a packet
625 * - requests to stop this thread
626 * @param argp points to the thread queue for internal messages
627 */
628void
629TPoverTLS_TCP::sender_thread(void *argp)
630{
631#ifndef _NO_LOGGING
632 const char *const methodname="senderthread - ";
633#endif
634
635 message* internal_thread_msg = NULL;
636
637 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
638
639 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
640 if (!fq)
641 {
642 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
643 return;
644 }
645
646 bool terminate= false;
647 TPoverTLS_TCPMsg* internalmsg= 0;
648 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
649 {
650 internalmsg= dynamic_cast<TPoverTLS_TCPMsg*>(internal_thread_msg);
651
652 if (internalmsg == 0)
653 {
654 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "received not an TPoverTLS_TCPMsg but a" << internal_thread_msg->get_type_name());
655 }
656 else
657 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::send_data)
658 {
659 // create a connection if none exists and send the netmsg
660 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
661 {
662 tcptlssend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
663 }
664 else
665 {
666 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "problem with passed arguments references, they point to 0");
667 }
668 }
669 else
670 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::stop)
671 {
672 terminate= true;
673 }
674 } // end while
675
676 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "<" << pthread_self() << "> terminated connection." );
677}
678
679
680/** receiver thread listens at a TCP socket for incoming PDUs
681 * and passes complete PDUs to the coordinator. Incomplete
682 * PDUs due to aborted connections or buffer overflows are discarded.
683 *
684 * @param argp - assoc data and flags sig_terminate and terminated
685 *
686 * @note this is a static member function, so you cannot use class variables
687 */
688void
689TPoverTLS_TCP::receiver_thread(void *argp)
690{
691#ifndef _NO_LOGGING
692 const char *const methodname="receiver - ";
693#endif
694 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
695 const appladdress* peer_addr = NULL;
696 const appladdress* own_addr = NULL;
697 uint32 bytes_received = 0;
698 TPMsg* tpmsg= NULL;
699
700 //uchar* sslbuffer = NULL;
701
702
703 // argument parsing - start
704 if (receiver_thread_argp == 0)
705 {
706 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
707
708 return;
709 }
710 else
711 {
712 // change status to running, i.e., not terminated
713 receiver_thread_argp->terminated= false;
714
715#ifdef _DEBUG
716 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
717#endif
718 }
719
720 int conn_socket= 0;
721 if (receiver_thread_argp->peer_assoc)
722 {
723 // get socket descriptor from arg
724 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
725 // get pointer to peer address of socket (source/sender address of peer) from arg
726 peer_addr= &receiver_thread_argp->peer_assoc->peer;
727 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
728 }
729 else
730 {
731 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer assoc available - pointer is NULL");
732
733 return;
734 }
735
736 if (peer_addr == 0)
737 {
738 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
739
740 return;
741 }
742 // argument parsing - end
743#ifdef _DEBUG
744 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
745 "Preparing to wait for data at socket "
746 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
747#endif
748
749 int ret= 0;
750 uint32 msgcontentlength= 0;
751 bool msgcontentlength_known= false;
752 bool pdu_complete= false; // when to terminate inner loop
753
754 /* maybe use this to create a new pdu,
755 /// constructor
756 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
757 */
758
759 // activate O_NON_BLOCK for recv on socket conn_socket
760 fcntl(conn_socket,F_SETFL, O_NONBLOCK);
761
762 // set options for polling
763 const unsigned int number_poll_sockets= 1;
764 struct pollfd poll_fd;
765 // have to set structure before poll call
766 poll_fd.fd = conn_socket;
767 poll_fd.events = POLLIN | POLLPRI;
768
769 int poll_status;
770 bool recv_error= false;
771
772 //get SSL handle
773 SSL *ssl=sslmap[conn_socket];
774
775 NetMsg* netmsg= 0;
776 NetMsg* remainbuf= 0;
777 uint32 buffer_bytes_left= 0;
778 size_t trailingbytes= 0;
779 bool skiprecv= false;
780 // loop until we receive a terminate signal (read-only var for this thread)
781 // or get an error from socket read
782 while( receiver_thread_argp->sig_terminate == false )
783 {
784 // Read next PDU from socket or process trailing bytes in remainbuf
785 ret= 0;
786 msgcontentlength= 0;
787 msgcontentlength_known= false;
788 pdu_complete= false;
789 netmsg= 0;
790
791 // there are trailing bytes left from the last receive call
792 if (remainbuf != 0)
793 {
794 netmsg= remainbuf;
795 remainbuf= 0;
796 buffer_bytes_left= netmsg->get_size()-trailingbytes;
797 bytes_received= trailingbytes;
798 trailingbytes= 0;
799 skiprecv= true;
800 }
801 else // no trailing bytes, create a new buffer
802 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
803 {
804 buffer_bytes_left= netmsg->get_size();
805 bytes_received= 0;
806 skiprecv= false;
807 }
808 else
809 { // buffer allocation failed
810 bytes_received= 0;
811 buffer_bytes_left= 0;
812 recv_error= true;
813 }
814
815 // loops until PDU is complete
816 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
817 while (!pdu_complete &&
818 !recv_error &&
819 !receiver_thread_argp->sig_terminate)
820 {
821 if (!skiprecv)
822 {
823 // read from TCP socket or return after sleep_time
824 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
825
826 if (receiver_thread_argp->sig_terminate)
827 {
828 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
829 // disallow sending
830 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
831 if (myassoc->shutdown == false)
832 {
833 myassoc->shutdown= true;
834 if (shutdown(myassoc->socketfd,SHUT_WR))
835 {
836 if ( errno != ENOTCONN )
837 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
838 }
839 }
840 // try to read do a last read from the TCP socket or return after sleep_time
841 if (poll_status == 0)
842 {
843 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
844 }
845 }
846
847 if (poll_fd.revents & POLLERR) // Error condition
848 {
849 if (errno == 0 || errno == EINTR)
850 {
851 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "poll(): " << strerror(errno));
852 }
853 else
854 {
855 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
856 recv_error= true;
857 }
858 }
859
860 if (poll_fd.revents & POLLHUP) // Hung up
861 {
862 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
863 recv_error= true;
864 }
865
866 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
867 {
868 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll Invalid request: fd not open");
869 recv_error= true;
870 }
871
872 // check status (return value) of poll call
873 switch (poll_status)
874 {
875 case -1:
876 if (errno == 0 || errno == EINTR)
877 {
878 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "Poll status: " << strerror(errno));
879 }
880 else
881 {
882 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
883 recv_error= true;
884 }
885
886 continue; // next while iteration
887 break;
888
889 case 0:
890#ifdef DEBUG_HARD
891 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
892#endif
893 continue; // next while iteration
894 break;
895
896 default:
897#ifdef DEBUG_HARD
898 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
899#endif
900 break;
901 } // end switch
902
903 /// spawn a new buffer for SSL decoding
904 //sslbuffer = new(nothrow) uchar[NetMsg::max_size];
905
906 /// receive data from socket buffer (recv will not block)
907 //ret = recv(conn_socket,
908 // netmsg->get_buffer() + bytes_received,
909 // buffer_bytes_left,
910 // 0);
911
912 /// try to read from SSL
913 ret = SSL_read(ssl, netmsg->get_buffer() + bytes_received, buffer_bytes_left);
914
915
916 /// memcpy decoded contents back into netmsg
917 //memcpy(netmsg->get_buffer() + bytes_received, sslbuffer, buffer_bytes_left);
918
919 /// delete sslbuffer
920 //delete sslbuffer;
921
922 if ( ret < 0 )
923 {
924 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
925 {
926 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
927 recv_error= true;
928 continue;
929 }
930 else
931 { // errno==EAGAIN || errno==EWOULDBLOCK
932 // just nothing to read from socket, continue w/ next poll
933 continue;
934 }
935 }
936 else
937 {
938 if (ret == 0)
939 {
940 // this means that EOF is reached,
941 // other side has closed connection
942 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
943 // disallow sending
944 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
945 if (myassoc->shutdown == false)
946 {
947 myassoc->shutdown= true;
948 if (shutdown(myassoc->socketfd,SHUT_WR))
949 {
950 if ( errno != ENOTCONN )
951 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
952 }
953 }
954 // not a real error, but we must quit the receive loop
955 recv_error= true;
956 }
957 else
958 {
959 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
960 // track number of received bytes
961 bytes_received+= ret;
962 buffer_bytes_left-= ret;
963 }
964 }
965 } // end if do not skip recv() statement
966
967 if (buffer_bytes_left < 0) ///< buffer space exhausted now
968 {
969 recv_error= true;
970 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
971 }
972
973 if (!msgcontentlength_known) ///< common header not parsed
974 {
975
976 // enough bytes read to parse common header?
977 if (bytes_received >= common_header_length)
978 {
979 // get message content length in number of bytes
980 if (getmsglength(*netmsg, msgcontentlength))
981 msgcontentlength_known= true;
982 else
983 {
984 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
985
986 ostringstream hexdumpstr;
987 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
988 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"dumping received bytes:" << hexdumpstr.str());
989
990 // reset all counters
991 msgcontentlength= 0;
992 msgcontentlength_known= false;
993 bytes_received= 0;
994 pdu_complete= false;
995 continue;
996 }
997 }
998 } // endif common header not parsed
999
1000 // check whether we have read the whole Protocol PDU
1001 DLog(tpparam.name, "bytes_received - common_header_length: " << bytes_received-common_header_length << "msgcontentlength: " << msgcontentlength << ">=");
1002 if (msgcontentlength_known && bytes_received-common_header_length >= msgcontentlength )
1003 {
1004 pdu_complete= true;
1005
1006 // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1007 netmsg->truncate(common_header_length+msgcontentlength);
1008
1009 // trailing bytes are copied into new buffer
1010 if (bytes_received-common_header_length > msgcontentlength)
1011 {
1012 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1013 remainbuf= new NetMsg(NetMsg::max_size);
1014 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1015 bytes_received= common_header_length+msgcontentlength;
1016 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1017 }
1018 }
1019 } // end while (!pdu_complete && !recv_error && !signalled for termination)
1020 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1021
1022 // if other side closed the connection, we should still be able to deliver the remaining data
1023 if (ret == 0)
1024 {
1025 recv_error= false;
1026 }
1027
1028 // deliver only complete PDUs to signaling module
1029 if (!recv_error && pdu_complete)
1030 {
1031 // create TPMsg and send it to the signaling thread
1032 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1033
1034 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "receipt of PDU now complete, sending to module " << message::get_qaddr_name(tpparam.dest));
1035
1036 debug_pdu=false;
1037
1038 if (debug_pdu)
1039 {
1040 ostringstream hexdump;
1041 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1042 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1043 }
1044
1045 // send the message if it was successfully created
1046 // bool message::send_to(qaddr_t dest, bool exp = false);
1047 if (!tpmsg
1048 || (!tpmsg->get_peeraddress())
1049 || (!tpmsg->send_to(tpparam.dest)))
1050 {
1051 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1052 if (tpmsg) delete tpmsg;
1053 } // end if tpmsg not allocated or not addr or not sent
1054
1055 } // end if !recv_error
1056 else
1057 { // error during receive or PDU incomplete
1058 if (bytes_received>0)
1059 {
1060 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1061 }
1062
1063 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1064 {
1065 ostringstream hexdumpstr;
1066 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1067 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
1068 }
1069 // leave the outer loop
1070 /**********************/
1071 break;
1072 /**********************/
1073 } // end else
1074
1075 } // end while (thread not signalled for termination)
1076
1077 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
1078 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1079
1080 // shutdown socket
1081 if (shutdown(conn_socket, SHUT_RD))
1082 {
1083 if ( errno != ENOTCONN )
1084 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1085 }
1086
1087 // close socket
1088 close(conn_socket);
1089
1090 receiver_thread_argp->terminated= true;
1091
1092 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1093
1094#ifdef _DEBUG
1095 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1096#endif
1097 // notify master thread for invocation of cleanup procedure
1098 TPoverTLS_TCPMsg* newmsg= new(nothrow)TPoverTLS_TCPMsg(receiver_thread_argp->peer_assoc);
1099 // send message to main loop thread
1100 newmsg->send_to(tpparam.source);
1101
1102
1103}
1104
1105
1106/** this signals a terminate to a thread and wait for the thread to stop
1107 * @note it is not safe to access any thread related data after this method returned,
1108 * because the receiver thread will initiate a cleanup_receiver_thread() method
1109 * which may erase all relevant thread data.
1110 */
1111void
1112TPoverTLS_TCP::stop_receiver_thread(AssocData* peer_assoc)
1113{
1114 // All operations on recv_thread_argmap and connmap require an already acquired lock
1115 // after this procedure peer_assoc may be invalid because it was erased
1116
1117 // start critical section
1118
1119 if (peer_assoc == 0)
1120 return;
1121
1122 pthread_t thread_id= peer_assoc->thread_ID;
1123
1124 // try to clean up receiver_thread_arg
1125 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1126 receiver_thread_arg_t* recv_thread_argp=
1127 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1128 if (recv_thread_argp)
1129 {
1130 if (!recv_thread_argp->terminated)
1131 {
1132 // thread signaled termination, but is not?
1133 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1134
1135 // signal thread for its termination
1136 recv_thread_argp->sig_terminate= true;
1137 // wait for thread to join after termination
1138 pthread_join(thread_id, 0);
1139 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1140 return;
1141 }
1142 }
1143 else
1144 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1145
1146}
1147
1148
1149/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1150 * usually called by the master_thread after the receiver_thread terminated
1151 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1152 * is still valid
1153 */
1154void
1155TPoverTLS_TCP::cleanup_receiver_thread(AssocData* peer_assoc)
1156{
1157 // All operations on recv_thread_argmap and connmap require an already acquired lock
1158 // after this procedure peer_assoc may be invalid because it was erased
1159
1160 // start critical section
1161
1162 if (peer_assoc == 0)
1163 return;
1164
1165 pthread_t thread_id= peer_assoc->thread_ID;
1166
1167 // try to clean up receiver_thread_arg
1168 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1169 receiver_thread_arg_t* recv_thread_argp=
1170 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1171 if (recv_thread_argp)
1172 {
1173 if (!recv_thread_argp->terminated)
1174 {
1175 // thread signaled termination, but is not?
1176 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1177 return;
1178 }
1179 else
1180 { // if thread is already terminated
1181 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1182
1183 // delete it from receiver map
1184 recv_thread_argmap.erase(recv_thread_arg_iter);
1185
1186 // then delete receiver arg structure
1187 delete recv_thread_argp;
1188 }
1189 }
1190
1191 // delete entry from connection map
1192
1193 // cleanup sender thread
1194 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1195 terminate_sender_thread(peer_assoc);
1196
1197 // delete the AssocData structure from the connection map
1198 // also frees allocated AssocData structure
1199 connmap.erase(peer_assoc);
1200
1201 // end critical section
1202
1203 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1204}
1205
1206
1207/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1208 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1209 * is still valid, a lock is also required, because senderthread_queuemap is changed
1210 */
1211void
1212TPoverTLS_TCP::terminate_sender_thread(const AssocData* assoc)
1213{
1214 if (assoc == 0)
1215 {
1216 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1217 return;
1218 }
1219
1220 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1221
1222 if (it != senderthread_queuemap.end())
1223 { // we have a sender thread: send a stop message to it
1224 FastQueue* destqueue= it->second;
1225 if (destqueue)
1226 {
1227 TPoverTLS_TCPMsg* internalmsg= new TPoverTLS_TCPMsg(assoc,tpparam.source,TPoverTLS_TCPMsg::stop);
1228 if (internalmsg)
1229 {
1230 // send the internal message to the sender thread queue
1231 internalmsg->send(tpparam.source,destqueue);
1232 }
1233 }
1234 else
1235 {
1236 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1237 }
1238 // erase entry from map
1239 senderthread_queuemap.erase(it);
1240 }
1241}
1242
1243/* terminate all active threads
1244 * note: locking should not be necessary here because this message is called as last method from
1245 * main_loop()
1246 */
1247void
1248TPoverTLS_TCP::terminate_all_threads()
1249{
1250 AssocData* assoc= 0;
1251 receiver_thread_arg_t* terminate_argp;
1252
1253 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1254 terminate_iterator != recv_thread_argmap.end();
1255 terminate_iterator++)
1256 {
1257 if ( (terminate_argp= terminate_iterator->second) != 0)
1258 {
1259 // we need a non const pointer to erase it later on
1260 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1261 // check whether thread is still alive
1262 if (terminate_argp->terminated == false)
1263 {
1264 terminate_argp->sig_terminate= true;
1265 // then wait for its termination
1266 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1267 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1268
1269 pthread_join(terminate_iterator->first, 0);
1270
1271 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1272 }
1273 else
1274 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1275 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1276
1277 // cleanup all remaining argument structures of terminated threads
1278 delete terminate_argp;
1279
1280 // terminate any related sender thread that is still running
1281 terminate_sender_thread(assoc);
1282
1283 connmap.erase(assoc);
1284 // delete assoc is not necessary, because connmap.erase() will do the job
1285 }
1286 } // end for
1287}
1288
1289
1290/**
1291 * sender thread starter:
1292 * just a static starter method to allow starting the
1293 * actual sender_thread() method.
1294 *
1295 * @param argp - pointer to the current TPoverTLS_TCP object instance and receiver_thread_arg_t struct
1296 */
1297void*
1298TPoverTLS_TCP::sender_thread_starter(void *argp)
1299{
1300 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1301
1302 // invoke sender thread method
1303 if (sargp != 0 && sargp->instance != 0)
1304 {
1305 // call receiver_thread member function on object instance
1306 sargp->instance->sender_thread(sargp->sender_thread_queue);
1307
1308 // no longer needed
1309 delete sargp;
1310 }
1311 else
1312 {
1313 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1314 }
1315 return 0;
1316}
1317
1318
1319
1320
1321/**
1322 * receiver thread starter:
1323 * just a static starter method to allow starting the
1324 * actual receiver_thread() method.
1325 *
1326 * @param argp - pointer to the current TPoverTLS_TCP object instance and receiver_thread_arg_t struct
1327 */
1328void*
1329TPoverTLS_TCP::receiver_thread_starter(void *argp)
1330{
1331 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1332 // invoke receiver thread method
1333 if (rargp != 0 && rargp->instance != 0)
1334 {
1335 // call receiver_thread member function on object instance
1336 rargp->instance->receiver_thread(rargp->rtargp);
1337
1338 // no longer needed
1339 delete rargp;
1340 }
1341 else
1342 {
1343 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1344 }
1345 return 0;
1346}
1347
1348
1349void
1350TPoverTLS_TCP::create_new_sender_thread(FastQueue* senderfqueue)
1351{
1352 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1353
1354 pthread_t senderthreadid;
1355 // create new thread; (arg == 0) is handled by thread, too
1356 int pthread_status= pthread_create(&senderthreadid,
1357 NULL, // NULL: default attributes: thread is joinable and has a
1358 // default, non-realtime scheduling policy
1359 TPoverTLS_TCP::sender_thread_starter,
1360 new sender_thread_start_arg_t(this,senderfqueue));
1361 if (pthread_status)
1362 {
1363 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1364
1365 delete senderfqueue;
1366 }
1367}
1368
1369
1370void
1371TPoverTLS_TCP::create_new_receiver_thread(AssocData* peer_assoc)
1372{
1373 receiver_thread_arg_t* argp=
1374 new(nothrow) receiver_thread_arg(peer_assoc);
1375
1376 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1377
1378 // create new thread; (arg == 0) is handled by thread, too
1379 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1380 NULL, // NULL: default attributes: thread is joinable and has a
1381 // default, non-realtime scheduling policy
1382 receiver_thread_starter,
1383 new(nothrow) receiver_thread_start_arg_t(this,argp));
1384 if (pthread_status)
1385 {
1386 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1387
1388 delete argp;
1389 }
1390 else
1391 {
1392 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
1393
1394 // remember pointer to thread arg structure
1395 // thread arg structure should be destroyed after thread termination only
1396 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1397 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1398 if (tmpinsiterator.second == false)
1399 {
1400 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1401 }
1402 unlock(); // uninstall_cleanup(1);
1403 }
1404}
1405
1406
1407/**
1408 * master listener thread starter:
1409 * just a static starter method to allow starting the
1410 * actual master_listener_thread() method.
1411 *
1412 * @param argp - pointer to the current TPoverTLS_TCP object instance
1413 */
1414void*
1415TPoverTLS_TCP::master_listener_thread_starter(void *argp)
1416{
1417 // invoke listener thread method
1418 if (argp != 0)
1419 {
1420 (static_cast<TPoverTLS_TCP*>(argp))->master_listener_thread();
1421 }
1422 return 0;
1423}
1424
1425
1426int
1427verify_callback(int ok, X509_STORE_CTX * store)
1428{
1429 if (!ok) {
1430 ERRLog("TPoverTLS", color[red] << "Client cert check failed" << color[off]);
1431 } else {
1432 DLog("TPoverTLS", color[green] << "Client cert check OK" << color[off]);
1433 };
1434 return ok;
1435}
1436
1437
1438/**
1439 * master listener thread: waits for incoming connections at the well-known tcp port
1440 * when a connection request is received this thread spawns a receiver_thread for
1441 * receiving packets from the peer at the new socket.
1442 */
1443void
1444TPoverTLS_TCP::master_listener_thread()
1445{
1446 // create a new address-structure for the listening masterthread
1447 struct sockaddr_in6 own_address;
1448 own_address.sin6_family = AF_INET6;
1449 own_address.sin6_flowinfo= 0;
1450 own_address.sin6_port = htons(tpparam.port); // use port number in param structure
1451 // accept incoming connections on all interfaces
1452 own_address.sin6_addr = in6addr_any;
1453 own_address.sin6_scope_id= 0;
1454
1455 // create a listening socket
1456 int master_listener_socket= socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1457 if (master_listener_socket == -1)
1458 {
1459 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1460 return;
1461 }
1462
1463 // Disable Nagle Algorithm, set (TCP_NODELAY)
1464 int nodelayflag= 1;
1465 int status= setsockopt(master_listener_socket,
1466 IPPROTO_TCP,
1467 TCP_NODELAY,
1468 &nodelayflag,
1469 sizeof(nodelayflag));
1470 if (status)
1471 {
1472 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1473 }
1474
1475 // Reuse ports, even if they are busy
1476 int socketreuseflag= 1;
1477 status= setsockopt(master_listener_socket,
1478 SOL_SOCKET,
1479 SO_REUSEADDR,
1480 (const char *) &socketreuseflag,
1481 sizeof(socketreuseflag));
1482 if (status)
1483 {
1484 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1485 }
1486
1487
1488 // bind the newly created socket to a specific address
1489 int bind_status = bind(master_listener_socket,
1490 reinterpret_cast<struct sockaddr *>(&own_address),
1491 sizeof(own_address));
1492 if (bind_status)
1493 {
1494 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
1495 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_tls, INET6_ADDRSTRLEN)
1496 << " port " << tpparam.port << " failed, error: " << strerror(errno));
1497 return;
1498 }
1499
1500 // listen at the socket,
1501 // queuesize for pending connections= max_listen_queue_size
1502 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1503 if (listen_status)
1504 {
1505 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1506 << " failed, error: " << strerror(errno));
1507 return;
1508 }
1509 else
1510 {
1511 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1512 }
1513
1514 // activate O_NON_BLOCK for accept (accept does not block)
1515 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1516
1517 // create a pollfd struct for use in the mainloop
1518 struct pollfd poll_fd;
1519 poll_fd.fd = master_listener_socket;
1520 poll_fd.events = POLLIN | POLLPRI;
1521 /*
1522 #define POLLIN 0x001 // There is data to read.
1523 #define POLLPRI 0x002 // There is urgent data to read.
1524 #define POLLOUT 0x004 // Writing now will not block.
1525 */
1526
1527 bool terminate = false;
1528 // check for thread terminate condition using get_state()
1529 state_t currstate= get_state();
1530 int poll_status= 0;
1531 const unsigned int number_poll_sockets= 1;
1532 struct sockaddr_in6 peer_address;
1533 socklen_t peer_address_len;
1534 int conn_socket;
1535
1536 // check whether this thread is signaled for termination
1537 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1538 {
1539 // wait on number_poll_sockets (main drm socket)
1540 // for the events specified above for sleep_time (in ms)
1541 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1542 if (poll_fd.revents & POLLERR) // Error condition
1543 {
1544 if (errno != EINTR)
1545 {
1546 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1547 "Poll caused error " << strerror(errno) << " - indicated by revents");
1548 }
1549 else
1550 {
1551 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1552 }
1553
1554 }
1555 if (poll_fd.revents & POLLHUP) // Hung up
1556 {
1557 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1558 return;
1559 }
1560 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1561 {
1562 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1563 return;
1564 }
1565
1566 switch (poll_status)
1567 {
1568 case -1:
1569 if (errno != EINTR)
1570 {
1571 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1572 }
1573 else
1574 {
1575 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1576 }
1577
1578 break;
1579
1580 case 0:
1581#ifdef DEBUG_HARD
1582 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1583 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1584#endif
1585 currstate= get_state();
1586 continue;
1587 break;
1588
1589 default:
1590#ifdef DEBUG_HARD
1591 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1592#endif
1593 break;
1594 } // end switch
1595
1596 // after a successful accept call,
1597 // accept stores the address information of the connecting party
1598 // in peer_address and the size of its address in addrlen
1599 peer_address_len= sizeof(peer_address);
1600
1601 conn_socket = accept (master_listener_socket,
1602 reinterpret_cast<struct sockaddr *>(&peer_address),
1603 &peer_address_len);
1604
1605
1606 if (conn_socket > -1) {
1607
1608 // initialise SSL handshake, save ssl pointer in sslmap
1609
1610 SSL_CTX *ssl_ctx_server = SSL_CTX_new(TLSv1_server_method());
1611
1612 // Load client certificate
1613 if (SSL_CTX_use_certificate_chain_file(ssl_ctx_server, "client_cert.pem") != 1) {
1614 ERRCLog(tpparam.name, "Unable to load client certificate");
1615 this->abort_processing();
1616
1617 }
1618
1619 // Load private key
1620 if (SSL_CTX_use_PrivateKey_file(ssl_ctx_server, "client_privkey.pem", SSL_FILETYPE_PEM) != 1) {
1621 ERRCLog(tpparam.name, "Unable to load private Key, reason:" << SSLerrmessage());
1622 this->abort_processing();
1623 }
1624
1625 // Verify private key
1626 if (SSL_CTX_load_verify_locations(ssl_ctx_server, "root_cert.pem", ".") != 1) {
1627 ERRCLog(tpparam.name, "Private Key failed verification against CA");
1628 this->abort_processing();
1629 }
1630
1631
1632
1633
1634 // We trust clients from this CA
1635 SSL_CTX_set_client_CA_list(ssl_ctx_server, SSL_load_client_CA_file("root_cert.pem"));
1636
1637 // Enable (optional) client authentication
1638 SSL_CTX_set_verify(ssl_ctx_server, SSL_VERIFY_PEER, verify_callback);
1639 SSL_CTX_set_verify_depth(ssl_ctx_server, 4);
1640
1641 SSL *ssl = SSL_new(ssl_ctx_server);
1642
1643 SSL_set_fd(ssl, conn_socket);
1644
1645 int result = SSL_accept(ssl);
1646
1647 if (result==-1) ERRCLog(tpparam.name, "SSL connect failed");
1648
1649 sslmap[conn_socket]=ssl;
1650
1651 DLog(tpparam.name, "SSL handle saved for future use with that socket");
1652
1653 }
1654
1655 if (conn_socket == -1)
1656 {
1657 if (errno != EWOULDBLOCK && errno != EAGAIN)
1658 {
1659 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1660 << " failed, error: " << strerror(errno));
1661 return;
1662 }
1663 }
1664 else
1665 {
1666 // create a new assocdata-object for the new thread
1667 AssocData* peer_assoc = NULL;
1668 appladdress addr(peer_address, IPPROTO_TCP);
1669
1670 addr.set_protocol(prot_tls_tcp);
1671
1672 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
1673 << " port #" << addr.get_port());
1674
1675 struct sockaddr_in6 own_address;
1676 socklen_t own_address_len= sizeof(own_address);
1677 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1678
1679 // AssocData will copy addr content into its own structure
1680 // allocated peer_assoc will be stored in connmap
1681 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1682
1683 bool insert_success= false;
1684 if (peer_assoc)
1685 {
1686 // start critical section
1687 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
1688 insert_success= connmap.insert(peer_assoc);
1689
1690 // end critical section
1691 unlock(); // uninstall_cleanup(1);
1692 }
1693
1694
1695
1696
1697 if (insert_success == false) // not inserted into connmap
1698 {
1699 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1700 << ", " << addr.get_ip_str() << ", port #"
1701 << addr.get_port() << " into connection map, aborting connection...");
1702
1703 // abort connection, delete its AssocData
1704 close (conn_socket);
1705 if (peer_assoc)
1706 {
1707 delete peer_assoc;
1708 peer_assoc= 0;
1709 }
1710 return;
1711
1712 } //end __else(connmap.insert());__
1713
1714 // create a new thread for each new connection
1715 create_new_receiver_thread(peer_assoc);
1716 } // end __else (connsocket)__
1717
1718 // get new thread state
1719 currstate= get_state();
1720
1721 } // end while(!terminate)
1722 return;
1723} // end listen_for_connections()
1724
1725
1726TPoverTLS_TCP::~TPoverTLS_TCP()
1727{
1728 init= false;
1729
1730 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1731
1732 QueueManager::instance()->unregister_queue(tpparam.source);
1733}
1734
1735/** TPoverTLS_TCP Thread main loop.
1736 * This loop checks for internal messages of either
1737 * a send() call to start a new receiver thread, or,
1738 * of a receiver_thread() that signals its own termination
1739 * for proper cleanup of control structures.
1740 * It also handles the following internal TPoverTLS_TCPMsg types:
1741 * - TPoverTLS_TCPMsg::stop - a particular receiver thread is terminated
1742 * - TPoverTLS_TCPMsg::start - a particular receiver thread is started
1743 * @param nr number of current thread instance
1744 */
1745void
1746TPoverTLS_TCP::main_loop(uint32 nr)
1747{
1748 SSL_library_init();
1749 OpenSSL_add_ssl_algorithms();
1750 SSL_load_error_strings();
1751
1752 ssl_ctx=SSL_CTX_new(TLSv1_client_method());
1753
1754 if (!ssl_ctx)
1755 {
1756 ERRCLog(tpparam.name, "could not create SSL context: "<< SSLerrmessage());
1757 }
1758
1759 // Load client certificate
1760 if (SSL_CTX_use_certificate_chain_file(ssl_ctx, tpparam.client_cert_filename) != 1) {
1761 ERRCLog(tpparam.name, "Unable to load client certificate: " << SSLerrmessage());
1762 }
1763 else
1764 {
1765 DLog(tpparam.name, color[green] << "Client certificate successfully loaded from " << tpparam.client_cert_filename << color[off]);
1766 }
1767
1768 // Load private key
1769 if (SSL_CTX_use_PrivateKey_file(ssl_ctx, tpparam.client_privkey_filename, SSL_FILETYPE_PEM) != 1) {
1770 ERRCLog(tpparam.name, "Unable to load private Key: " << SSLerrmessage());
1771 }
1772 else
1773 {
1774 DLog(tpparam.name, color[green] << "Private key successfully loaded from " << tpparam.client_privkey_filename << color[off]);
1775 }
1776
1777 // Verify private key
1778 if (SSL_CTX_load_verify_locations(ssl_ctx, tpparam.root_cert_filename, ".") != 1) {
1779 ERRCLog(tpparam.name, "Private Key failed verification against CA");
1780 }
1781 else
1782 {
1783 DLog(tpparam.name, color[green] << "Private key successfully verified as being certified by our CA " << tpparam.root_cert_filename << color[off]);
1784 }
1785
1786
1787 // get internal queue for messages from receiver_thread
1788 FastQueue* fq = get_fqueue();
1789 if (!fq)
1790 {
1791 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1792 return;
1793 } // end if not fq
1794 // register queue for receiving internal messages from other modules
1795 QueueManager::instance()->register_queue(fq,tpparam.source);
1796
1797
1798
1799 // start master listener thread
1800 pthread_t master_listener_thread_ID;
1801 int pthread_status= pthread_create(&master_listener_thread_ID,
1802 NULL, // NULL: default attributes: thread is joinable and has a
1803 // default, non-realtime scheduling policy
1804 master_listener_thread_starter,
1805 this);
1806 if (pthread_status)
1807 {
1808 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1809 "New master listener thread could not be created: " << strerror(pthread_status));
1810 }
1811 else
1812 DLog(tpparam.name, "Master listener thread started");
1813
1814
1815 // define max latency for thread reaction on termination/stop signal
1816 timespec wait_interval= { 0, 250000000L }; // 250ms
1817 message* internal_thread_msg = NULL;
1818 state_t currstate= get_state();
1819
1820 // check whether this thread is signaled for termination
1821 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1822 {
1823 // poll internal message queue (blocking)
1824 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1825 {
1826 TPoverTLS_TCPMsg* internalmsg= dynamic_cast<TPoverTLS_TCPMsg*>(internal_thread_msg);
1827 if (internalmsg)
1828 {
1829 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::stop)
1830 {
1831 // a receiver thread terminated and signaled for cleanup by master thread
1832 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1833 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1834 lock();
1835 cleanup_receiver_thread( assocd );
1836 unlock();
1837 }
1838 else
1839 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::start)
1840 {
1841 // start a new receiver thread
1842 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1843 }
1844 else
1845 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1846
1847 delete internalmsg;
1848 }
1849 else
1850 {
1851 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1852 << internal_thread_msg->get_source());
1853 }
1854 } // endif
1855
1856 // get thread state
1857 currstate= get_state();
1858 } // end while
1859
1860 if (currstate==STATE_STOP)
1861 {
1862 // start abort actions
1863 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1864 } // end if stopped
1865
1866 // do not accept any more messages
1867 fq->shutdown();
1868 // terminate all receiver and sender threads that are still active
1869 terminate_all_threads();
1870}
1871
1872} // end namespace protlib
1873///@}
Note: See TracBrowser for help on using the repository browser.