source: source/ariba/communication/modules/transport/protlib/tp_over_tls_tcp.cpp@ 5638

Last change on this file since 5638 was 5638, checked in by Christoph Mayer, 15 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 57.2 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tls_tcp.cpp
3/// TCP/TLS-based transport module
4/// ----------------------------------------------------------
5/// $Id: tp_over_tls_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tls_tcp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tls_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63namespace protlib {
64
65using namespace log;
66
67/** @defgroup tptcp TP over TCP
68 * @ingroup network
69 * @{
70 */
71
72char in6_addrstr_tls[INET6_ADDRSTRLEN+1];
73
74
75/******* class TPoverTLS_TCP *******/
76
77/*
78 * Obtain reason string for last SSL error
79 *
80 * Some caution is needed here since ERR_reason_error_string will
81 * return NULL if it doesn't recognize the error code. We don't
82 * want to return NULL ever.
83 */
84const char *
85TPoverTLS_TCP::SSLerrmessage(void)
86{
87 unsigned long errcode;
88 const char *errreason;
89 static char errbuf[32];
90
91 errcode = ERR_get_error();
92 if (errcode == 0)
93 return "No SSL error reported";
94 errreason = ERR_reason_error_string(errcode);
95 if (errreason != NULL)
96 return errreason;
97 snprintf(errbuf, sizeof(errbuf), "SSL error code %lu", errcode);
98 return errbuf;
99}
100
101
102
103/** get_connection_to() checks for already existing connections.
104 * If a connection exists, it returns "AssocData"
105 * and saves it in "connmap" for further use
106 * If no connection exists, a new connection to "addr"
107 * is created.
108 */
109AssocData*
110TPoverTLS_TCP::get_connection_to(const appladdress& addr)
111{
112 // get timeout
113 struct timespec ts;
114 get_time_of_day(ts);
115 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
116 if (ts.tv_nsec>=1000000000L)
117 {
118 ts.tv_sec += ts.tv_nsec / 1000000000L;
119 ts.tv_nsec= ts.tv_nsec % 1000000000L;
120 }
121
122 // create a new AssocData pointer, initialize it to NULL
123 AssocData* assoc= NULL;
124 int new_socket;
125 // loop until timeout is exceeded: TODO: check applicability of loop
126 do
127 {
128 // check for existing connections to addr
129 // start critical section
130 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
131 assoc= connmap.lookup(addr);
132 // end critical section
133 unlock(); // uninstall_cleanup(1);
134 if (assoc)
135 {
136 // If not shut down then use it, else abort, wait and check again.
137 if (!assoc->shutdown)
138 {
139 return assoc;
140 }
141 else
142 {
143 // TODO: connection is already in shutdown mode. What now?
144 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"socket exists, but is already in mode shutdown");
145
146 return 0;
147 }
148 } //end __if (assoc)__
149 else
150 {
151 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
152 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
153 }
154
155 // no connection found, create a new one
156 new_socket = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
157
158 if (new_socket == -1)
159 {
160 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"Couldn't create a new socket: " << strerror(errno));
161
162 return 0;
163 }
164
165 // Disable Nagle Algorithm, set (TCP_NODELAY)
166 int nodelayflag= 1;
167 int status= setsockopt(new_socket,
168 IPPROTO_TCP,
169 TCP_NODELAY,
170 &nodelayflag,
171 sizeof(nodelayflag));
172 if (status)
173 {
174 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
175 }
176
177 // Reuse ports, even if they are busy
178 int socketreuseflag= 1;
179 status= setsockopt(new_socket,
180 SOL_SOCKET,
181 SO_REUSEADDR,
182 (const char *) &socketreuseflag,
183 sizeof(socketreuseflag));
184 if (status)
185 {
186 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
187 }
188
189 struct sockaddr_in6 dest_address;
190 dest_address.sin6_flowinfo= 0;
191 dest_address.sin6_scope_id= 0;
192 addr.get_sockaddr(dest_address);
193
194 // connect the socket to the destination address
195 int connect_status = connect(new_socket,
196 reinterpret_cast<const struct sockaddr*>(&dest_address),
197 sizeof(dest_address));
198
199
200 // connects to the listening_port of the peer's masterthread
201 if (connect_status != 0)
202 {
203 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
204 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
205 // FIXME: that doesn't work gracefully
206 //throw TPError(TPError::ERROR_UNREACHABLE);
207 return 0;
208 }
209
210 struct sockaddr_in6 own_address;
211 socklen_t own_address_len= sizeof(own_address);
212 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
213
214
215 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
216 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr_tls,INET6_ADDRSTRLEN)
217 << " port #" << ntohs(own_address.sin6_port));
218
219 // create new AssocData record (will copy addr)
220 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
221
222 DLog(tpparam.name, "Before creation of a new SSL object");
223
224 SSL *ssl = NULL;
225
226 if (!ssl_ctx) ERRCLog(tpparam.name, "Could not create SSL context");
227
228 ssl = SSL_new(ssl_ctx);
229
230 if (!ssl)
231 {
232 ERRCLog(tpparam.name,"could not create SSL context: " << SSLerrmessage());
233 }
234
235 if (!ssl)
236 ERRCLog(tpparam.name, "Could not create SSL object");
237
238 DLog(tpparam.name, "Before binding it to the socket");
239
240 SSL_set_fd(ssl, new_socket);
241
242 DLog(tpparam.name, "Bound SSL object to the socket");
243
244
245 // Connect to the server
246 if ((SSL_connect(ssl)) < 1) {
247 ERRCLog(tpparam.name, "SSL connect failed in get_connection_to(): " << SSLerrmessage());
248 }
249
250 sslmap[new_socket]=ssl;
251
252 DLog(tpparam.name, "Inserted SSL object into sslmap");
253
254 // if assoc could be successfully created, insert it into ConnectionMap
255 if (assoc)
256 {
257 bool insert_success= false;
258 // start critical section
259 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
260 // insert AssocData into connection map
261 insert_success= connmap.insert(assoc);
262 // end critical section
263 unlock(); // uninstall_cleanup(1);
264
265 if (insert_success == true)
266 {
267#ifdef _DEBUG
268 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
269 << " via socket " << new_socket);
270
271
272#endif
273
274 // notify master thread to start a receiver thread (i.e. send selfmessage)
275 TPoverTLS_TCPMsg* newmsg= new(nothrow)TPoverTLS_TCPMsg(assoc, tpparam.source, TPoverTLS_TCPMsg::start);
276 if (newmsg)
277 {
278 newmsg->send_to(tpparam.source);
279 return assoc;
280 }
281 else
282 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"get_connection_to: could not get memory for internal msg");
283 }
284 else
285 {
286 // delete data and abort
287 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
288 <<", port #" << addr.get_port() << " into connection map, aborting connection");
289
290 // abort connection, delete its AssocData
291 close (new_socket);
292 if (assoc)
293 {
294 delete assoc;
295 assoc= 0;
296 }
297 return assoc;
298 } // end else connmap.insert
299
300 } // end "if (assoc)"
301 }
302 while (wait_cond(ts)!=ETIMEDOUT);
303
304 return assoc;
305} //end get_connection_to
306
307
308/** terminates a signaling association/connection
309 * - if no connection exists, generate a warning
310 * - otherwise: generate internal msg to related receiver thread
311 */
312void
313TPoverTLS_TCP::terminate(const address& in_addr)
314{
315
316 appladdress* addr = NULL;
317 addr= dynamic_cast<appladdress*>(in_addr.copy());
318
319 if (!addr) return;
320
321#ifndef _NO_LOGGING
322 const char *const thisproc="terminate() - ";
323#endif
324
325 // Create a new AssocData-pointer
326 AssocData* assoc = NULL;
327
328 // check for existing connections to addr
329
330 // start critical section:
331 // please note if receiver_thread terminated already, the assoc data is not
332 // available anymore therefore we need a lock around cleanup_receiver_thread()
333
334 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
335 assoc= connmap.lookup(*addr);
336 if (assoc)
337 {
338 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"got request to shutdown connection for peer " << *addr);
339 // If not shut down then use it, else abort, wait and check again.
340 if (!assoc->shutdown)
341 {
342 if (assoc->socketfd)
343 {
344 // disallow sending
345 if (shutdown(assoc->socketfd,SHUT_WR))
346 {
347 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,thisproc<<"shutdown (write) on socket for peer " << *addr << " returned error:" << strerror(errno));
348 }
349 else
350 {
351 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"initiated closing of connection for peer " << *addr << ". Shutdown (write) on socket "<< assoc->socketfd );
352 }
353 }
354 assoc->shutdown= true;
355 }
356 else
357 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"connection for peer " << *addr << " is already in mode shutdown");
358
359 }
360 else
361 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
362
363 stop_receiver_thread(assoc);
364
365 // end critical section
366 unlock(); // uninstall_cleanup(1);
367
368
369 if (addr) delete addr;
370
371}
372
373
374/** generates and internal TPoverTLS_TCP message to send a NetMsg to the network
375 *
376 * - it is necessary to let a thread do this, because the caller
377 * may get blocked if the connect() or send() call hangs for a while
378 * - the sending thread will call TPoverTLS_TCP::tcpsend()
379 * - if no connection exists, creates a new one
380 * @note the netmsg is deleted by the send() method when it is not used anymore
381 */
382void
383TPoverTLS_TCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
384{
385 if (netmsg == NULL) {
386 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
387 return;
388 }
389
390 appladdress* addr = NULL;
391 addr = dynamic_cast<appladdress*>(in_addr.copy());
392
393 if (!addr)
394 {
395 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type: " << (int) in_addr.get_type());
396 return;
397 }
398
399 // lock due to sendermap access
400 lock();
401
402 // check for existing sender thread
403 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
404
405 FastQueue* destqueue= 0;
406
407 if (it == senderthread_queuemap.end())
408 { // no sender thread so far
409
410 // if we already have an existing connection it is save to create a sender thread
411 // since get_connection_to() will not be invoked, so an existing connection will
412 // be used
413 const AssocData* assoc = connmap.lookup(*addr);
414
415 //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
416
417 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
418 { // it is allowed to create a new connection for this thread
419 // create a new queue for sender thread
420 FastQueue* sender_thread_queue= new FastQueue;
421 create_new_sender_thread(sender_thread_queue);
422 // remember queue for later use
423
424 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
425 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
426
427 destqueue= sender_thread_queue;
428 }
429 }
430 else
431 { // we have a sender thread, use stored queue from map
432 destqueue= it->second;
433 }
434
435 unlock();
436
437 // send a send_data message to it (if we have a destination queue)
438 if (destqueue)
439 {
440 // both parameters will be freed after message was sent!
441
442 TPoverTLS_TCPMsg* internalmsg= new TPoverTLS_TCPMsg(netmsg,new appladdress(*addr));
443 if (internalmsg)
444 {
445 // send the internal message to the sender thread queue
446 internalmsg->send(tpparam.source,destqueue);
447 }
448 }
449 else
450 {
451 if (!use_existing_connection)
452 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
453 else
454 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
455
456 // cannot send data, so we must drop it
457 delete netmsg;
458
459 }
460
461 if (addr) delete addr;
462}
463
464/** sends a NetMsg to the network.
465 *
466 * @param netmsg message to send
467 * @param addr transport endpoint address
468 *
469 * @note if no connection exists, creates a new one
470 * @note both parameters are deleted after the message was sent
471 */
472void
473TPoverTLS_TCP::tcptlssend(NetMsg* netmsg, appladdress* addr)
474{
475#ifndef _NO_LOGGING
476 const char *const thisproc="sender - ";
477#endif
478
479 // set result initially to success, set it to failure
480 // in places where these failures occur
481 int result = TCP_SUCCESS;
482 int ret= 0;
483
484 // Create a new AssocData-pointer
485 AssocData* assoc = NULL;
486
487 // tp.cpp checks for initialisation of tp and correctness of
488 // msgsize, protocol and ip,
489 // throws an error if something is not right
490 if (addr) {
491 addr->convert_to_ipv6();
492 check_send_args(*netmsg,*addr);
493
494 } else {
495 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "address pointer is NULL");
496 result= TCP_SEND_FAILURE;
497 throw TPErrorInternal();
498 }
499
500
501
502 // check for existing connections,
503 // if a connection exists, return its AssocData
504 // and save it in assoc for further use
505 // if no connection exists, create a new one (in get_connection_to()).
506 // Connection is inserted into connection map that must be done
507 // with exclusive access
508 assoc= get_connection_to(*addr);
509
510 if (assoc==NULL || assoc->socketfd<=0)
511 {
512 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "no valid assoc/socket data");
513
514 delete netmsg;
515 delete addr;
516 return;
517 }
518 if (assoc->shutdown)
519 {
520 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
521 delete netmsg;
522 delete addr;
523
524 throw TPErrorSendFailed();
525 }
526
527 uint32 msgsize= netmsg->get_size();
528#ifdef DEBUG_HARD
529 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
530#endif
531
532
533 // get SSL connection data
534 SSL* ssl= sslmap[assoc->socketfd];
535
536
537 // send all the data contained in netmsg to the socket
538 // which belongs to the address "addr"
539 for (uint32 bytes_sent= 0;
540 bytes_sent < msgsize;
541 bytes_sent+= ret)
542 {
543
544#ifdef _DEBUG_HARD
545 for (uint32 i=0;i<msgsize;i++)
546 {
547 cout << "send_buf: " << i << " : ";
548 if ( isalnum(*(netmsg->get_buffer()+i)) )
549 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
550 else
551 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
552 cout << endl;
553 }
554
555 cout << endl;
556 cout << "bytes_sent: " << bytes_sent << endl;
557 cout << "Message size: " << msgsize << endl;
558 cout << "Send-Socket: " << assoc->socketfd << endl;
559 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
560 cout << "vor send " << endl;
561#endif
562
563
564 // socket send
565 //ret= ::send(assoc->socketfd,
566// netmsg->get_buffer() + bytes_sent,
567// msgsize - bytes_sent,
568// MSG_NOSIGNAL);
569
570
571 ret = SSL_write(ssl, netmsg->get_buffer() + bytes_sent, msgsize - bytes_sent);
572
573 //int SSL_write(SSL *ssl, const void *buf, int num);
574
575 //send_buf + bytes_sent
576
577 if (debug_pdu)
578 {
579 ostringstream hexdump;
580 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
581 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
582 }
583
584 if (ret < 0)
585 {
586 result= TCP_SEND_FAILURE;
587 break;
588 } // end if (ret < 0)
589 } // end for
590
591 // *** note: netmsg is deleted here ***
592 delete netmsg;
593
594 // Throwing an exception within a critical section does not
595 // unlock the mutex.
596
597 if (result != TCP_SUCCESS)
598 {
599 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "TLS/TCP error, returns " << ret << ", error : " << strerror(errno));
600 delete addr;
601
602 throw TPErrorSendFailed();
603
604 }
605 else
606 Log(EVENT_LOG,LOG_NORMAL,tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
607
608 if (!assoc) {
609 // no connection
610
611 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
612 << ", port #" << addr->get_port());
613
614 delete addr;
615
616 throw TPErrorUnreachable(); // should be no assoc found
617 } // end "if (!assoc)"
618
619 // *** delete address ***
620 delete addr;
621}
622
623/* this thread waits for an internal message that either:
624 * - requests transmission of a packet
625 * - requests to stop this thread
626 * @param argp points to the thread queue for internal messages
627 */
628void
629TPoverTLS_TCP::sender_thread(void *argp)
630{
631#ifndef _NO_LOGGING
632 const char *const methodname="senderthread - ";
633#endif
634
635 message* internal_thread_msg = NULL;
636
637 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
638
639 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
640 if (!fq)
641 {
642 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
643 return;
644 }
645
646 bool terminate= false;
647 TPoverTLS_TCPMsg* internalmsg= 0;
648 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
649 {
650 internalmsg= dynamic_cast<TPoverTLS_TCPMsg*>(internal_thread_msg);
651
652 if (internalmsg == 0)
653 {
654 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "received not an TPoverTLS_TCPMsg but a" << internal_thread_msg->get_type_name());
655 }
656 else
657 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::send_data)
658 {
659 // create a connection if none exists and send the netmsg
660 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
661 {
662 tcptlssend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
663 }
664 else
665 {
666 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "problem with passed arguments references, they point to 0");
667 }
668 }
669 else
670 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::stop)
671 {
672 terminate= true;
673 }
674 } // end while
675
676 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "<" << pthread_self() << "> terminated connection." );
677}
678
679
680/** receiver thread listens at a TCP socket for incoming PDUs
681 * and passes complete PDUs to the coordinator. Incomplete
682 * PDUs due to aborted connections or buffer overflows are discarded.
683 *
684 * @param argp - assoc data and flags sig_terminate and terminated
685 *
686 * @note this is a static member function, so you cannot use class variables
687 */
688void
689TPoverTLS_TCP::receiver_thread(void *argp)
690{
691#ifndef _NO_LOGGING
692 const char *const methodname="receiver - ";
693#endif
694 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
695 const appladdress* peer_addr = NULL;
696 const appladdress* own_addr = NULL;
697 uint32 bytes_received = 0;
698 TPMsg* tpmsg= NULL;
699
700 //uchar* sslbuffer = NULL;
701
702
703 // argument parsing - start
704 if (receiver_thread_argp == 0)
705 {
706 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
707
708 return;
709 }
710 else
711 {
712 // change status to running, i.e., not terminated
713 receiver_thread_argp->terminated= false;
714
715#ifdef _DEBUG
716 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
717#endif
718 }
719
720 int conn_socket= 0;
721 if (receiver_thread_argp->peer_assoc)
722 {
723 // get socket descriptor from arg
724 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
725 // get pointer to peer address of socket (source/sender address of peer) from arg
726 peer_addr= &receiver_thread_argp->peer_assoc->peer;
727 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
728 }
729 else
730 {
731 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer assoc available - pointer is NULL");
732
733 return;
734 }
735
736 if (peer_addr == 0)
737 {
738 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
739
740 return;
741 }
742 // argument parsing - end
743#ifdef _DEBUG
744 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
745 "Preparing to wait for data at socket "
746 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
747#endif
748
749 int ret= 0;
750 uint32 msgcontentlength= 0;
751 bool msgcontentlength_known= false;
752 bool pdu_complete= false; // when to terminate inner loop
753
754 /* maybe use this to create a new pdu,
755 /// constructor
756 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
757 */
758
759 // activate O_NON_BLOCK for recv on socket conn_socket
760 fcntl(conn_socket,F_SETFL, O_NONBLOCK);
761
762 // set options for polling
763 const unsigned int number_poll_sockets= 1;
764 struct pollfd poll_fd;
765 // have to set structure before poll call
766 poll_fd.fd = conn_socket;
767 poll_fd.events = POLLIN | POLLPRI;
768
769 int poll_status;
770 bool recv_error= false;
771
772 //get SSL handle
773 SSL *ssl=sslmap[conn_socket];
774
775 NetMsg* netmsg= 0;
776 NetMsg* remainbuf= 0;
777 uint32 buffer_bytes_left= 0;
778 size_t trailingbytes= 0;
779 bool skiprecv= false;
780 // loop until we receive a terminate signal (read-only var for this thread)
781 // or get an error from socket read
782 while( receiver_thread_argp->sig_terminate == false )
783 {
784 // Read next PDU from socket or process trailing bytes in remainbuf
785 ret= 0;
786 msgcontentlength= 0;
787 msgcontentlength_known= false;
788 pdu_complete= false;
789 netmsg= 0;
790
791 // there are trailing bytes left from the last receive call
792 if (remainbuf != 0)
793 {
794 netmsg= remainbuf;
795 remainbuf= 0;
796 buffer_bytes_left= netmsg->get_size()-trailingbytes;
797 bytes_received= trailingbytes;
798 trailingbytes= 0;
799 skiprecv= true;
800 }
801 else // no trailing bytes, create a new buffer
802 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
803 {
804 buffer_bytes_left= netmsg->get_size();
805 bytes_received= 0;
806 skiprecv= false;
807 }
808 else
809 { // buffer allocation failed
810 bytes_received= 0;
811 buffer_bytes_left= 0;
812 recv_error= true;
813 }
814
815 // loops until PDU is complete
816 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
817 while (!pdu_complete &&
818 !recv_error &&
819 !receiver_thread_argp->sig_terminate)
820 {
821 if (!skiprecv)
822 {
823 // read from TCP socket or return after sleep_time
824 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
825
826 if (receiver_thread_argp->sig_terminate)
827 {
828 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
829 // disallow sending
830 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
831 if (myassoc->shutdown == false)
832 {
833 myassoc->shutdown= true;
834 if (shutdown(myassoc->socketfd,SHUT_WR))
835 {
836 if ( errno != ENOTCONN )
837 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
838 }
839 }
840 // try to read do a last read from the TCP socket or return after sleep_time
841 if (poll_status == 0)
842 {
843 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
844 }
845 }
846
847 if (poll_fd.revents & POLLERR) // Error condition
848 {
849 if (errno == 0 || errno == EINTR)
850 {
851 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "poll(): " << strerror(errno));
852 }
853 else
854 {
855 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
856 recv_error= true;
857 }
858 }
859
860 if (poll_fd.revents & POLLHUP) // Hung up
861 {
862 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
863 recv_error= true;
864 }
865
866 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
867 {
868 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll Invalid request: fd not open");
869 recv_error= true;
870 }
871
872 // check status (return value) of poll call
873 switch (poll_status)
874 {
875 case -1:
876 if (errno == 0 || errno == EINTR)
877 {
878 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "Poll status: " << strerror(errno));
879 }
880 else
881 {
882 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
883 recv_error= true;
884 }
885
886 continue; // next while iteration
887 break;
888
889 case 0:
890#ifdef DEBUG_HARD
891 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
892#endif
893 continue; // next while iteration
894 break;
895
896 default:
897#ifdef DEBUG_HARD
898 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
899#endif
900 break;
901 } // end switch
902
903 /// spawn a new buffer for SSL decoding
904 //sslbuffer = new(nothrow) uchar[NetMsg::max_size];
905
906 /// receive data from socket buffer (recv will not block)
907 //ret = recv(conn_socket,
908 // netmsg->get_buffer() + bytes_received,
909 // buffer_bytes_left,
910 // 0);
911
912 /// try to read from SSL
913 ret = SSL_read(ssl, netmsg->get_buffer() + bytes_received, buffer_bytes_left);
914
915
916 /// memcpy decoded contents back into netmsg
917 //memcpy(netmsg->get_buffer() + bytes_received, sslbuffer, buffer_bytes_left);
918
919 /// delete sslbuffer
920 //delete sslbuffer;
921
922 if ( ret < 0 )
923 {
924 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
925 {
926 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
927 recv_error= true;
928 continue;
929 }
930 else
931 { // errno==EAGAIN || errno==EWOULDBLOCK
932 // just nothing to read from socket, continue w/ next poll
933 continue;
934 }
935 }
936 else
937 {
938 if (ret == 0)
939 {
940 // this means that EOF is reached,
941 // other side has closed connection
942 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
943 // disallow sending
944 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
945 if (myassoc->shutdown == false)
946 {
947 myassoc->shutdown= true;
948 if (shutdown(myassoc->socketfd,SHUT_WR))
949 {
950 if ( errno != ENOTCONN )
951 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
952 }
953 }
954 // not a real error, but we must quit the receive loop
955 recv_error= true;
956 }
957 else
958 {
959 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
960 // track number of received bytes
961 bytes_received+= ret;
962 buffer_bytes_left-= ret;
963 }
964 }
965 } // end if do not skip recv() statement
966
967 if (buffer_bytes_left < 0) ///< buffer space exhausted now
968 {
969 recv_error= true;
970 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
971 }
972
973 if (!msgcontentlength_known) ///< common header not parsed
974 {
975
976 // enough bytes read to parse common header?
977 if (bytes_received >= common_header_length)
978 {
979 // get message content length in number of bytes
980 if (getmsglength(*netmsg, msgcontentlength))
981 msgcontentlength_known= true;
982 else
983 {
984 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
985
986 ostringstream hexdumpstr;
987 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
988 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"dumping received bytes:" << hexdumpstr.str());
989
990 // reset all counters
991 msgcontentlength= 0;
992 msgcontentlength_known= false;
993 bytes_received= 0;
994 pdu_complete= false;
995 continue;
996 }
997 }
998 } // endif common header not parsed
999
1000 // check whether we have read the whole Protocol PDU
1001 DLog(tpparam.name, "bytes_received - common_header_length: " << bytes_received-common_header_length << "msgcontentlength: " << msgcontentlength << ">=");
1002 if (msgcontentlength_known && bytes_received-common_header_length >= msgcontentlength )
1003 {
1004 pdu_complete= true;
1005
1006 // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1007 netmsg->truncate(common_header_length+msgcontentlength);
1008
1009 // trailing bytes are copied into new buffer
1010 if (bytes_received-common_header_length > msgcontentlength)
1011 {
1012 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1013 remainbuf= new NetMsg(NetMsg::max_size);
1014 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1015 bytes_received= common_header_length+msgcontentlength;
1016 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1017 }
1018 }
1019 } // end while (!pdu_complete && !recv_error && !signalled for termination)
1020 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1021
1022 // if other side closed the connection, we should still be able to deliver the remaining data
1023 if (ret == 0)
1024 {
1025 recv_error= false;
1026 }
1027
1028 // deliver only complete PDUs to signaling module
1029 if (!recv_error && pdu_complete)
1030 {
1031 // create TPMsg and send it to the signaling thread
1032 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1033
1034 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "receipt of PDU now complete, sending to module " << message::get_qaddr_name(tpparam.dest));
1035
1036 debug_pdu=false;
1037
1038 if (debug_pdu)
1039 {
1040 ostringstream hexdump;
1041 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1042 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1043 }
1044
1045 // send the message if it was successfully created
1046 // bool message::send_to(qaddr_t dest, bool exp = false);
1047 if (!tpmsg
1048 || (!tpmsg->get_peeraddress())
1049 || (!tpmsg->send_to(tpparam.dest)))
1050 {
1051 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1052 if (tpmsg) delete tpmsg;
1053 } // end if tpmsg not allocated or not addr or not sent
1054
1055 } // end if !recv_error
1056 else
1057 { // error during receive or PDU incomplete
1058 if (bytes_received>0)
1059 {
1060 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1061 }
1062
1063 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1064 {
1065 ostringstream hexdumpstr;
1066 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1067 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
1068 }
1069 // leave the outer loop
1070 /**********************/
1071 break;
1072 /**********************/
1073 } // end else
1074
1075 } // end while (thread not signalled for termination)
1076
1077 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
1078 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1079
1080 // shutdown socket
1081 if (shutdown(conn_socket, SHUT_RD))
1082 {
1083 if ( errno != ENOTCONN )
1084 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1085 }
1086
1087 // close socket
1088 close(conn_socket);
1089
1090 receiver_thread_argp->terminated= true;
1091
1092 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1093
1094#ifdef _DEBUG
1095 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1096#endif
1097 // notify master thread for invocation of cleanup procedure
1098 TPoverTLS_TCPMsg* newmsg= new(nothrow)TPoverTLS_TCPMsg(receiver_thread_argp->peer_assoc);
1099 // send message to main loop thread
1100 newmsg->send_to(tpparam.source);
1101
1102
1103}
1104
1105
1106/** this signals a terminate to a thread and wait for the thread to stop
1107 * @note it is not safe to access any thread related data after this method returned,
1108 * because the receiver thread will initiate a cleanup_receiver_thread() method
1109 * which may erase all relevant thread data.
1110 */
1111void
1112TPoverTLS_TCP::stop_receiver_thread(AssocData* peer_assoc)
1113{
1114 // All operations on recv_thread_argmap and connmap require an already acquired lock
1115 // after this procedure peer_assoc may be invalid because it was erased
1116
1117 // start critical section
1118
1119 if (peer_assoc == 0)
1120 return;
1121
1122 pthread_t thread_id= peer_assoc->thread_ID;
1123
1124 // try to clean up receiver_thread_arg
1125 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1126 receiver_thread_arg_t* recv_thread_argp=
1127 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1128 if (recv_thread_argp)
1129 {
1130 if (!recv_thread_argp->terminated)
1131 {
1132 // thread signaled termination, but is not?
1133 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1134
1135 // signal thread for its termination
1136 recv_thread_argp->sig_terminate= true;
1137 // wait for thread to join after termination
1138 pthread_join(thread_id, 0);
1139 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1140 return;
1141 }
1142 }
1143 else
1144 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1145
1146}
1147
1148
1149/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1150 * usually called by the master_thread after the receiver_thread terminated
1151 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1152 * is still valid
1153 */
1154void
1155TPoverTLS_TCP::cleanup_receiver_thread(AssocData* peer_assoc)
1156{
1157 // All operations on recv_thread_argmap and connmap require an already acquired lock
1158 // after this procedure peer_assoc may be invalid because it was erased
1159
1160 // start critical section
1161
1162 if (peer_assoc == 0)
1163 return;
1164
1165 pthread_t thread_id= peer_assoc->thread_ID;
1166
1167 // try to clean up receiver_thread_arg
1168 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1169 receiver_thread_arg_t* recv_thread_argp=
1170 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1171 if (recv_thread_argp)
1172 {
1173 if (!recv_thread_argp->terminated)
1174 {
1175 // thread signaled termination, but is not?
1176 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1177 return;
1178 }
1179 else
1180 { // if thread is already terminated
1181 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1182
1183 // delete it from receiver map
1184 recv_thread_argmap.erase(recv_thread_arg_iter);
1185
1186 // then delete receiver arg structure
1187 delete recv_thread_argp;
1188 }
1189 }
1190
1191 // delete entry from connection map
1192
1193 // cleanup sender thread
1194 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1195 terminate_sender_thread(peer_assoc);
1196
1197 // delete the AssocData structure from the connection map
1198 // also frees allocated AssocData structure
1199 connmap.erase(peer_assoc);
1200
1201 // end critical section
1202
1203 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1204}
1205
1206
1207/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1208 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1209 * is still valid, a lock is also required, because senderthread_queuemap is changed
1210 */
1211void
1212TPoverTLS_TCP::terminate_sender_thread(const AssocData* assoc)
1213{
1214 if (assoc == 0)
1215 {
1216 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1217 return;
1218 }
1219
1220 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1221
1222 if (it != senderthread_queuemap.end())
1223 { // we have a sender thread: send a stop message to it
1224 FastQueue* destqueue= it->second;
1225 if (destqueue)
1226 {
1227 TPoverTLS_TCPMsg* internalmsg= new TPoverTLS_TCPMsg(assoc,tpparam.source,TPoverTLS_TCPMsg::stop);
1228 if (internalmsg)
1229 {
1230 // send the internal message to the sender thread queue
1231 internalmsg->send(tpparam.source,destqueue);
1232 }
1233 }
1234 else
1235 {
1236 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1237 }
1238 // erase entry from map
1239 senderthread_queuemap.erase(it);
1240 }
1241}
1242
1243/* terminate all active threads
1244 * note: locking should not be necessary here because this message is called as last method from
1245 * main_loop()
1246 */
1247void
1248TPoverTLS_TCP::terminate_all_threads()
1249{
1250 AssocData* assoc= 0;
1251 receiver_thread_arg_t* terminate_argp;
1252
1253 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1254 terminate_iterator != recv_thread_argmap.end();
1255 terminate_iterator++)
1256 {
1257 if ( (terminate_argp= terminate_iterator->second) != 0)
1258 {
1259 // we need a non const pointer to erase it later on
1260 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1261 // check whether thread is still alive
1262 if (terminate_argp->terminated == false)
1263 {
1264 terminate_argp->sig_terminate= true;
1265 // then wait for its termination
1266 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1267 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1268
1269 pthread_join(terminate_iterator->first, 0);
1270
1271 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1272 }
1273 else
1274 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1275 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1276
1277 // cleanup all remaining argument structures of terminated threads
1278 delete terminate_argp;
1279
1280 // terminate any related sender thread that is still running
1281 terminate_sender_thread(assoc);
1282
1283 connmap.erase(assoc);
1284 // delete assoc is not necessary, because connmap.erase() will do the job
1285 }
1286 } // end for
1287}
1288
1289
1290/**
1291 * sender thread starter:
1292 * just a static starter method to allow starting the
1293 * actual sender_thread() method.
1294 *
1295 * @param argp - pointer to the current TPoverTLS_TCP object instance and receiver_thread_arg_t struct
1296 */
1297void*
1298TPoverTLS_TCP::sender_thread_starter(void *argp)
1299{
1300 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1301
1302 // invoke sender thread method
1303 if (sargp != 0 && sargp->instance != 0)
1304 {
1305 // call receiver_thread member function on object instance
1306 sargp->instance->sender_thread(sargp->sender_thread_queue);
1307
1308 // no longer needed
1309 delete sargp;
1310 }
1311 else
1312 {
1313 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1314 }
1315 return 0;
1316}
1317
1318
1319
1320
1321/**
1322 * receiver thread starter:
1323 * just a static starter method to allow starting the
1324 * actual receiver_thread() method.
1325 *
1326 * @param argp - pointer to the current TPoverTLS_TCP object instance and receiver_thread_arg_t struct
1327 */
1328void*
1329TPoverTLS_TCP::receiver_thread_starter(void *argp)
1330{
1331 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1332 // invoke receiver thread method
1333 if (rargp != 0 && rargp->instance != 0)
1334 {
1335 // call receiver_thread member function on object instance
1336 rargp->instance->receiver_thread(rargp->rtargp);
1337
1338 // no longer needed
1339 delete rargp;
1340 }
1341 else
1342 {
1343 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1344 }
1345 return 0;
1346}
1347
1348
1349void
1350TPoverTLS_TCP::create_new_sender_thread(FastQueue* senderfqueue)
1351{
1352 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1353
1354 pthread_t senderthreadid;
1355 // create new thread; (arg == 0) is handled by thread, too
1356 int pthread_status= pthread_create(&senderthreadid,
1357 NULL, // NULL: default attributes: thread is joinable and has a
1358 // default, non-realtime scheduling policy
1359 TPoverTLS_TCP::sender_thread_starter,
1360 new sender_thread_start_arg_t(this,senderfqueue));
1361 if (pthread_status)
1362 {
1363 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1364
1365 delete senderfqueue;
1366 }
1367}
1368
1369
1370void
1371TPoverTLS_TCP::create_new_receiver_thread(AssocData* peer_assoc)
1372{
1373 receiver_thread_arg_t* argp=
1374 new(nothrow) receiver_thread_arg(peer_assoc);
1375
1376 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1377
1378 // create new thread; (arg == 0) is handled by thread, too
1379 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1380 NULL, // NULL: default attributes: thread is joinable and has a
1381 // default, non-realtime scheduling policy
1382 receiver_thread_starter,
1383 new(nothrow) receiver_thread_start_arg_t(this,argp));
1384 if (pthread_status)
1385 {
1386 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1387
1388 delete argp;
1389 }
1390 else
1391 {
1392 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
1393
1394 // remember pointer to thread arg structure
1395 // thread arg structure should be destroyed after thread termination only
1396 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1397 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1398 if (tmpinsiterator.second == false)
1399 {
1400 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1401 }
1402 unlock(); // uninstall_cleanup(1);
1403 }
1404}
1405
1406
1407/**
1408 * master listener thread starter:
1409 * just a static starter method to allow starting the
1410 * actual master_listener_thread() method.
1411 *
1412 * @param argp - pointer to the current TPoverTLS_TCP object instance
1413 */
1414void*
1415TPoverTLS_TCP::master_listener_thread_starter(void *argp)
1416{
1417 // invoke listener thread method
1418 if (argp != 0)
1419 {
1420 (static_cast<TPoverTLS_TCP*>(argp))->master_listener_thread();
1421 }
1422 return 0;
1423}
1424
1425
1426int
1427verify_callback(int ok, X509_STORE_CTX * store)
1428{
1429 if (!ok) {
1430 ERRLog("TPoverTLS", color[red] << "Client cert check failed" << color[off]);
1431 } else {
1432 DLog("TPoverTLS", color[green] << "Client cert check OK" << color[off]);
1433 };
1434 return ok;
1435}
1436
1437
1438/**
1439 * master listener thread: waits for incoming connections at the well-known tcp port
1440 * when a connection request is received this thread spawns a receiver_thread for
1441 * receiving packets from the peer at the new socket.
1442 */
1443void
1444TPoverTLS_TCP::master_listener_thread()
1445{
1446 // create a new address-structure for the listening masterthread
1447 struct sockaddr_in6 own_address;
1448 own_address.sin6_family = AF_INET6;
1449 own_address.sin6_flowinfo= 0;
1450 own_address.sin6_port = htons(tpparam.port); // use port number in param structure
1451 // accept incoming connections on all interfaces
1452 own_address.sin6_addr = in6addr_any;
1453 own_address.sin6_scope_id= 0;
1454
1455 // create a listening socket
1456 int master_listener_socket= socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1457 if (master_listener_socket == -1)
1458 {
1459 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1460 return;
1461 }
1462
1463 // Disable Nagle Algorithm, set (TCP_NODELAY)
1464 int nodelayflag= 1;
1465 int status= setsockopt(master_listener_socket,
1466 IPPROTO_TCP,
1467 TCP_NODELAY,
1468 &nodelayflag,
1469 sizeof(nodelayflag));
1470 if (status)
1471 {
1472 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1473 }
1474
1475 // Reuse ports, even if they are busy
1476 int socketreuseflag= 1;
1477 status= setsockopt(master_listener_socket,
1478 SOL_SOCKET,
1479 SO_REUSEADDR,
1480 (const char *) &socketreuseflag,
1481 sizeof(socketreuseflag));
1482 if (status)
1483 {
1484 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1485 }
1486
1487
1488 // bind the newly created socket to a specific address
1489 int bind_status = bind(master_listener_socket,
1490 reinterpret_cast<struct sockaddr *>(&own_address),
1491 sizeof(own_address));
1492 if (bind_status)
1493 {
1494 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
1495 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_tls, INET6_ADDRSTRLEN)
1496 << " port " << tpparam.port << " failed, error: " << strerror(errno));
1497 return;
1498 }
1499
1500 // listen at the socket,
1501 // queuesize for pending connections= max_listen_queue_size
1502 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1503 if (listen_status)
1504 {
1505 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1506 << " failed, error: " << strerror(errno));
1507 return;
1508 }
1509 else
1510 {
1511 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1512 }
1513
1514 // activate O_NON_BLOCK for accept (accept does not block)
1515 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1516
1517 // create a pollfd struct for use in the mainloop
1518 struct pollfd poll_fd;
1519 poll_fd.fd = master_listener_socket;
1520 poll_fd.events = POLLIN | POLLPRI;
1521 /*
1522 #define POLLIN 0x001 // There is data to read.
1523 #define POLLPRI 0x002 // There is urgent data to read.
1524 #define POLLOUT 0x004 // Writing now will not block.
1525 */
1526
1527 bool terminate = false;
1528 // check for thread terminate condition using get_state()
1529 state_t currstate= get_state();
1530 int poll_status= 0;
1531 const unsigned int number_poll_sockets= 1;
1532 struct sockaddr_in6 peer_address;
1533 socklen_t peer_address_len;
1534 int conn_socket;
1535
1536 // check whether this thread is signaled for termination
1537 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1538 {
1539 // wait on number_poll_sockets (main drm socket)
1540 // for the events specified above for sleep_time (in ms)
1541 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1542 if (poll_fd.revents & POLLERR) // Error condition
1543 {
1544 if (errno != EINTR)
1545 {
1546 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1547 "Poll caused error " << strerror(errno) << " - indicated by revents");
1548 }
1549 else
1550 {
1551 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1552 }
1553
1554 }
1555 if (poll_fd.revents & POLLHUP) // Hung up
1556 {
1557 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1558 return;
1559 }
1560 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1561 {
1562 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1563 return;
1564 }
1565
1566 switch (poll_status)
1567 {
1568 case -1:
1569 if (errno != EINTR)
1570 {
1571 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1572 }
1573 else
1574 {
1575 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1576 }
1577
1578 break;
1579
1580 case 0:
1581#ifdef DEBUG_HARD
1582 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1583 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1584#endif
1585 currstate= get_state();
1586 continue;
1587 break;
1588
1589 default:
1590#ifdef DEBUG_HARD
1591 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1592#endif
1593 break;
1594 } // end switch
1595
1596 // after a successful accept call,
1597 // accept stores the address information of the connecting party
1598 // in peer_address and the size of its address in addrlen
1599 peer_address_len= sizeof(peer_address);
1600
1601 conn_socket = accept (master_listener_socket,
1602 reinterpret_cast<struct sockaddr *>(&peer_address),
1603 &peer_address_len);
1604
1605
1606 if (conn_socket > -1) {
1607
1608 // initialise SSL handshake, save ssl pointer in sslmap
1609
1610 SSL_CTX *ssl_ctx_server = SSL_CTX_new(TLSv1_server_method());
1611
1612 // Load client certificate
1613 if (SSL_CTX_use_certificate_chain_file(ssl_ctx_server, "client_cert.pem") != 1) {
1614 ERRCLog(tpparam.name, "Unable to load client certificate");
1615 this->abort_processing();
1616
1617 }
1618
1619 // Load private key
1620 if (SSL_CTX_use_PrivateKey_file(ssl_ctx_server, "client_privkey.pem", SSL_FILETYPE_PEM) != 1) {
1621 ERRCLog(tpparam.name, "Unable to load private Key, reason:" << SSLerrmessage());
1622 this->abort_processing();
1623 }
1624
1625 // Verify private key
1626 if (SSL_CTX_load_verify_locations(ssl_ctx_server, "root_cert.pem", ".") != 1) {
1627 ERRCLog(tpparam.name, "Private Key failed verification against CA");
1628 this->abort_processing();
1629 }
1630
1631
1632
1633
1634 // We trust clients from this CA
1635 SSL_CTX_set_client_CA_list(ssl_ctx_server, SSL_load_client_CA_file("root_cert.pem"));
1636
1637 // Enable (optional) client authentication
1638 SSL_CTX_set_verify(ssl_ctx_server, SSL_VERIFY_PEER, verify_callback);
1639 SSL_CTX_set_verify_depth(ssl_ctx_server, 4);
1640
1641 SSL *ssl = SSL_new(ssl_ctx_server);
1642
1643 SSL_set_fd(ssl, conn_socket);
1644
1645 int result = SSL_accept(ssl);
1646
1647 if (result==-1) ERRCLog(tpparam.name, "SSL connect failed");
1648
1649 sslmap[conn_socket]=ssl;
1650
1651 DLog(tpparam.name, "SSL handle saved for future use with that socket");
1652
1653 }
1654
1655 if (conn_socket == -1)
1656 {
1657 if (errno != EWOULDBLOCK && errno != EAGAIN)
1658 {
1659 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1660 << " failed, error: " << strerror(errno));
1661 return;
1662 }
1663 }
1664 else
1665 {
1666 // create a new assocdata-object for the new thread
1667 AssocData* peer_assoc = NULL;
1668 appladdress addr(peer_address, IPPROTO_TCP);
1669
1670 addr.set_protocol(prot_tls_tcp);
1671
1672 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
1673 << " port #" << addr.get_port());
1674
1675 struct sockaddr_in6 own_address;
1676 socklen_t own_address_len= sizeof(own_address);
1677 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1678
1679 // AssocData will copy addr content into its own structure
1680 // allocated peer_assoc will be stored in connmap
1681 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1682
1683 bool insert_success= false;
1684 if (peer_assoc)
1685 {
1686 // start critical section
1687 lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
1688 insert_success= connmap.insert(peer_assoc);
1689
1690 // end critical section
1691 unlock(); // uninstall_cleanup(1);
1692 }
1693
1694
1695
1696
1697 if (insert_success == false) // not inserted into connmap
1698 {
1699 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1700 << ", " << addr.get_ip_str() << ", port #"
1701 << addr.get_port() << " into connection map, aborting connection...");
1702
1703 // abort connection, delete its AssocData
1704 close (conn_socket);
1705 if (peer_assoc)
1706 {
1707 delete peer_assoc;
1708 peer_assoc= 0;
1709 }
1710 return;
1711
1712 } //end __else(connmap.insert());__
1713
1714 // create a new thread for each new connection
1715 create_new_receiver_thread(peer_assoc);
1716 } // end __else (connsocket)__
1717
1718 // get new thread state
1719 currstate= get_state();
1720
1721 } // end while(!terminate)
1722 return;
1723} // end listen_for_connections()
1724
1725
1726TPoverTLS_TCP::~TPoverTLS_TCP()
1727{
1728 init= false;
1729
1730 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1731
1732 QueueManager::instance()->unregister_queue(tpparam.source);
1733}
1734
1735/** TPoverTLS_TCP Thread main loop.
1736 * This loop checks for internal messages of either
1737 * a send() call to start a new receiver thread, or,
1738 * of a receiver_thread() that signals its own termination
1739 * for proper cleanup of control structures.
1740 * It also handles the following internal TPoverTLS_TCPMsg types:
1741 * - TPoverTLS_TCPMsg::stop - a particular receiver thread is terminated
1742 * - TPoverTLS_TCPMsg::start - a particular receiver thread is started
1743 * @param nr number of current thread instance
1744 */
1745void
1746TPoverTLS_TCP::main_loop(uint32 nr)
1747{
1748 SSL_library_init();
1749 OpenSSL_add_ssl_algorithms();
1750 SSL_load_error_strings();
1751
1752 ssl_ctx=SSL_CTX_new(TLSv1_client_method());
1753
1754 if (!ssl_ctx)
1755 {
1756 ERRCLog(tpparam.name, "could not create SSL context: "<< SSLerrmessage());
1757 }
1758
1759 // Load client certificate
1760 if (SSL_CTX_use_certificate_chain_file(ssl_ctx, tpparam.client_cert_filename) != 1) {
1761 ERRCLog(tpparam.name, "Unable to load client certificate: " << SSLerrmessage());
1762 }
1763 else
1764 {
1765 DLog(tpparam.name, color[green] << "Client certificate successfully loaded from " << tpparam.client_cert_filename << color[off]);
1766 }
1767
1768 // Load private key
1769 if (SSL_CTX_use_PrivateKey_file(ssl_ctx, tpparam.client_privkey_filename, SSL_FILETYPE_PEM) != 1) {
1770 ERRCLog(tpparam.name, "Unable to load private Key: " << SSLerrmessage());
1771 }
1772 else
1773 {
1774 DLog(tpparam.name, color[green] << "Private key successfully loaded from " << tpparam.client_privkey_filename << color[off]);
1775 }
1776
1777 // Verify private key
1778 if (SSL_CTX_load_verify_locations(ssl_ctx, tpparam.root_cert_filename, ".") != 1) {
1779 ERRCLog(tpparam.name, "Private Key failed verification against CA");
1780 }
1781 else
1782 {
1783 DLog(tpparam.name, color[green] << "Private key successfully verified as being certified by our CA " << tpparam.root_cert_filename << color[off]);
1784 }
1785
1786
1787 // get internal queue for messages from receiver_thread
1788 FastQueue* fq = get_fqueue();
1789 if (!fq)
1790 {
1791 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1792 return;
1793 } // end if not fq
1794 // register queue for receiving internal messages from other modules
1795 QueueManager::instance()->register_queue(fq,tpparam.source);
1796
1797
1798
1799 // start master listener thread
1800 pthread_t master_listener_thread_ID;
1801 int pthread_status= pthread_create(&master_listener_thread_ID,
1802 NULL, // NULL: default attributes: thread is joinable and has a
1803 // default, non-realtime scheduling policy
1804 master_listener_thread_starter,
1805 this);
1806 if (pthread_status)
1807 {
1808 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1809 "New master listener thread could not be created: " << strerror(pthread_status));
1810 }
1811 else
1812 DLog(tpparam.name, "Master listener thread started");
1813
1814
1815 // define max latency for thread reaction on termination/stop signal
1816 timespec wait_interval= { 0, 250000000L }; // 250ms
1817 message* internal_thread_msg = NULL;
1818 state_t currstate= get_state();
1819
1820 // check whether this thread is signaled for termination
1821 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1822 {
1823 // poll internal message queue (blocking)
1824 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1825 {
1826 TPoverTLS_TCPMsg* internalmsg= dynamic_cast<TPoverTLS_TCPMsg*>(internal_thread_msg);
1827 if (internalmsg)
1828 {
1829 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::stop)
1830 {
1831 // a receiver thread terminated and signaled for cleanup by master thread
1832 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1833 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1834 lock();
1835 cleanup_receiver_thread( assocd );
1836 unlock();
1837 }
1838 else
1839 if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::start)
1840 {
1841 // start a new receiver thread
1842 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1843 }
1844 else
1845 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1846
1847 delete internalmsg;
1848 }
1849 else
1850 {
1851 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1852 << internal_thread_msg->get_source());
1853 }
1854 } // endif
1855
1856 // get thread state
1857 currstate= get_state();
1858 } // end while
1859
1860 if (currstate==STATE_STOP)
1861 {
1862 // start abort actions
1863 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1864 } // end if stopped
1865
1866 // do not accept any more messages
1867 fq->shutdown();
1868 // terminate all receiver and sender threads that are still active
1869 terminate_all_threads();
1870}
1871
1872} // end namespace protlib
1873///@}
Note: See TracBrowser for help on using the repository browser.