An Overlay-based
Virtual Network Substrate
SpoVNet

source: trash/old-modules/transport/protlib/tp_over_tls_tcp.cpp @ 5641

Last change on this file since 5641 was 5641, checked in by Christoph Mayer, 14 years ago
File size: 57.2 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tls_tcp.cpp
3/// TCP/TLS-based transport module
4/// ----------------------------------------------------------
5/// $Id: tp_over_tls_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tls_tcp.cpp $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29 
30extern "C"
31{
32  //#define _SOCKADDR_LEN    /* use BSD 4.4 sockets */
33#include <unistd.h>      /* gethostname */
34#include <sys/types.h>   /* network socket interface */
35#include <netinet/in.h>  /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h>   /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tls_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63namespace protlib {
64
65using namespace log;
66
67/** @defgroup tptcp TP over TCP
68 * @ingroup network
69 * @{
70 */
71
72char in6_addrstr_tls[INET6_ADDRSTRLEN+1];
73
74
75/******* class TPoverTLS_TCP *******/
76
77/*
78 * Obtain reason string for last SSL error
79 *
80 * Some caution is needed here since ERR_reason_error_string will
81 * return NULL if it doesn't recognize the error code.  We don't
82 * want to return NULL ever.
83 */
84const char *
85TPoverTLS_TCP::SSLerrmessage(void)
86{
87        unsigned long   errcode;
88        const char         *errreason;
89        static char             errbuf[32];
90
91        errcode = ERR_get_error();
92        if (errcode == 0)
93                return "No SSL error reported";
94        errreason = ERR_reason_error_string(errcode);
95        if (errreason != NULL)
96                return errreason;
97        snprintf(errbuf, sizeof(errbuf), "SSL error code %lu", errcode);
98        return errbuf;
99}
100
101
102
103/** get_connection_to() checks for already existing connections.
104 *  If a connection exists, it returns "AssocData"
105 *  and saves it in "connmap" for further use
106 *  If no connection exists, a new connection to "addr"
107 *  is created.
108 */
109AssocData* 
110TPoverTLS_TCP::get_connection_to(const appladdress& addr)
111{
112  // get timeout
113  struct timespec ts;
114  get_time_of_day(ts);
115  ts.tv_nsec+= tpparam.sleep_time * 1000000L;
116  if (ts.tv_nsec>=1000000000L)
117  {
118    ts.tv_sec += ts.tv_nsec / 1000000000L;
119    ts.tv_nsec= ts.tv_nsec % 1000000000L;
120  }
121
122  // create a new AssocData pointer, initialize it to NULL
123  AssocData* assoc= NULL;
124  int new_socket;
125  // loop until timeout is exceeded: TODO: check applicability of loop
126  do 
127  {
128    // check for existing connections to addr
129    // start critical section
130    lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
131    assoc= connmap.lookup(addr);
132    // end critical section
133    unlock(); // uninstall_cleanup(1);
134    if (assoc) 
135    {
136      // If not shut down then use it, else abort, wait and check again.
137      if (!assoc->shutdown) 
138      {
139        return assoc;
140      }
141      else
142      {
143        // TODO: connection is already in shutdown mode. What now?
144        Log(ERROR_LOG,LOG_CRIT,tpparam.name,"socket exists, but is already in mode shutdown");
145
146        return 0;
147      } 
148    } //end __if (assoc)__
149    else 
150    {
151      Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to " 
152          << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
153    }
154
155    // no connection found, create a new one
156    new_socket = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
157   
158    if (new_socket == -1)
159    {
160      Log(ERROR_LOG,LOG_CRIT,tpparam.name,"Couldn't create a new socket: " << strerror(errno));
161
162      return 0;
163    }
164
165    // Disable Nagle Algorithm, set (TCP_NODELAY)
166    int nodelayflag= 1;
167    int status= setsockopt(new_socket,
168                           IPPROTO_TCP,
169                           TCP_NODELAY,
170                           &nodelayflag,
171                           sizeof(nodelayflag));
172    if (status)
173    {
174        Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
175    }
176   
177    // Reuse ports, even if they are busy
178    int socketreuseflag= 1;
179    status= setsockopt(new_socket,
180                           SOL_SOCKET,
181                           SO_REUSEADDR,
182                           (const char *) &socketreuseflag,
183                           sizeof(socketreuseflag));
184    if (status)
185    {
186        Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
187    }
188
189    struct sockaddr_in6 dest_address;
190    dest_address.sin6_flowinfo= 0;
191    dest_address.sin6_scope_id= 0;
192    addr.get_sockaddr(dest_address);
193
194   // connect the socket to the destination address
195    int connect_status = connect(new_socket,
196                                 reinterpret_cast<const struct sockaddr*>(&dest_address),
197                                 sizeof(dest_address));
198
199
200    // connects to the listening_port of the peer's masterthread   
201    if (connect_status != 0)
202    {
203      Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port() 
204          << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
205      // FIXME: that doesn't work gracefully
206      //throw TPError(TPError::ERROR_UNREACHABLE);
207      return 0;
208    }
209   
210    struct sockaddr_in6 own_address;
211    socklen_t own_address_len= sizeof(own_address);
212    getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
213
214
215    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port() 
216        << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr_tls,INET6_ADDRSTRLEN) 
217        << " port #" << ntohs(own_address.sin6_port));
218   
219    // create new AssocData record (will copy addr)
220    assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
221
222    DLog(tpparam.name, "Before creation of a new SSL object");
223
224    SSL *ssl = NULL;
225
226    if (!ssl_ctx) ERRCLog(tpparam.name, "Could not create SSL context");
227
228    ssl = SSL_new(ssl_ctx);
229   
230    if (!ssl)
231    {
232      ERRCLog(tpparam.name,"could not create SSL context: " << SSLerrmessage());
233    }
234
235    if (!ssl) 
236      ERRCLog(tpparam.name, "Could not create SSL object");
237 
238    DLog(tpparam.name, "Before binding it to the socket");
239
240    SSL_set_fd(ssl, new_socket);
241   
242    DLog(tpparam.name, "Bound SSL object to the socket");
243
244
245    // Connect to the server
246    if ((SSL_connect(ssl)) < 1) {
247        ERRCLog(tpparam.name, "SSL connect failed in get_connection_to(): " << SSLerrmessage());
248    }
249
250    sslmap[new_socket]=ssl;
251
252    DLog(tpparam.name, "Inserted SSL object into sslmap");
253
254    // if assoc could be successfully created, insert it into ConnectionMap
255    if (assoc) 
256    {
257      bool insert_success= false;
258      // start critical section
259      lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
260      // insert AssocData into connection map
261      insert_success= connmap.insert(assoc);
262      // end critical section
263      unlock(); // uninstall_cleanup(1);
264
265      if (insert_success == true) 
266      {
267#ifdef _DEBUG
268        Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
269            << " via socket " << new_socket);
270
271               
272#endif
273
274        // notify master thread to start a receiver thread (i.e. send selfmessage)
275        TPoverTLS_TCPMsg* newmsg= new(nothrow)TPoverTLS_TCPMsg(assoc, tpparam.source, TPoverTLS_TCPMsg::start);
276        if (newmsg)
277        {
278          newmsg->send_to(tpparam.source);
279          return assoc;
280        }
281        else
282          Log(ERROR_LOG,LOG_CRIT,tpparam.name,"get_connection_to: could not get memory for internal msg");
283      } 
284      else 
285      {
286        // delete data and abort
287        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
288            <<", port #" << addr.get_port() << " into connection map, aborting connection");
289               
290        // abort connection, delete its AssocData
291        close (new_socket);
292        if (assoc) 
293        { 
294          delete assoc; 
295          assoc= 0;
296        }
297        return assoc;
298      } // end else connmap.insert                     
299     
300    } // end "if (assoc)"
301  } 
302  while (wait_cond(ts)!=ETIMEDOUT);
303
304  return assoc;
305} //end get_connection_to
306
307
308/** terminates a signaling association/connection
309 *  - if no connection exists, generate a warning
310 *  - otherwise: generate internal msg to related receiver thread
311 */
312void
313TPoverTLS_TCP::terminate(const address& in_addr)
314{
315
316    appladdress* addr = NULL;
317    addr= dynamic_cast<appladdress*>(in_addr.copy());
318   
319    if (!addr) return;
320
321#ifndef _NO_LOGGING
322  const char *const thisproc="terminate()   - ";
323#endif
324
325  // Create a new AssocData-pointer
326  AssocData* assoc = NULL;
327
328  // check for existing connections to addr
329
330  // start critical section:
331  // please note if receiver_thread terminated already, the assoc data is not
332  // available anymore therefore we need a lock around cleanup_receiver_thread()
333
334  lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
335  assoc= connmap.lookup(*addr);
336  if (assoc) 
337  {
338    Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"got request to shutdown connection for peer " << *addr);
339    // If not shut down then use it, else abort, wait and check again.
340    if (!assoc->shutdown) 
341    {
342      if (assoc->socketfd)
343      {
344        // disallow sending
345        if (shutdown(assoc->socketfd,SHUT_WR))
346        {
347          Log(ERROR_LOG,LOG_UNIMP,tpparam.name,thisproc<<"shutdown (write) on socket for peer " << *addr << " returned error:" << strerror(errno));
348        }
349        else
350        {
351          Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"initiated closing of connection for peer " << *addr << ". Shutdown (write) on socket "<< assoc->socketfd );   
352        }
353      }
354      assoc->shutdown= true;
355    }
356    else
357      Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"connection for peer " << *addr << " is already in mode shutdown");
358     
359  }
360  else
361    Log(WARNING_LOG,LOG_NORMAL,tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
362
363  stop_receiver_thread(assoc);
364
365  // end critical section
366  unlock(); // uninstall_cleanup(1);
367
368
369  if (addr) delete addr;
370
371}
372
373
374/** generates and internal TPoverTLS_TCP message to send a NetMsg to the network
375 *
376 *  - it is necessary to let a thread do this, because the caller
377 *     may get blocked if the connect() or send() call hangs for a while
378 *  - the sending thread will call TPoverTLS_TCP::tcpsend()
379 *  - if no connection exists, creates a new one
380 *  @note the netmsg is deleted by the send() method when it is not used anymore
381 */
382void
383TPoverTLS_TCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
384{
385  if (netmsg == NULL) {
386    ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
387    return;
388  }
389
390  appladdress* addr = NULL;
391  addr = dynamic_cast<appladdress*>(in_addr.copy());
392
393  if (!addr)
394  {
395    ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type: " << (int) in_addr.get_type());
396    return;
397  }
398
399  // lock due to sendermap access
400  lock();
401
402  // check for existing sender thread
403  sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
404
405  FastQueue* destqueue= 0;
406
407  if (it == senderthread_queuemap.end())
408  { // no sender thread so far
409
410    // if we already have an existing connection it is save to create a sender thread
411    // since get_connection_to() will not be invoked, so an existing connection will
412    // be used
413    const AssocData* assoc = connmap.lookup(*addr);
414
415    //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
416
417    if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
418    { // it is allowed to create a new connection for this thread
419      // create a new queue for sender thread
420      FastQueue* sender_thread_queue= new FastQueue;
421      create_new_sender_thread(sender_thread_queue);
422      // remember queue for later use
423     
424      //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
425      senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
426
427      destqueue= sender_thread_queue;
428    }
429  }
430  else
431  { // we have a sender thread, use stored queue from map
432    destqueue= it->second; 
433  }
434
435  unlock();
436   
437  // send a send_data message to it (if we have a destination queue)
438  if (destqueue)
439  {
440    // both parameters will be freed after message was sent!
441   
442    TPoverTLS_TCPMsg* internalmsg= new TPoverTLS_TCPMsg(netmsg,new appladdress(*addr));
443    if (internalmsg)
444    {
445      // send the internal message to the sender thread queue
446      internalmsg->send(tpparam.source,destqueue);
447    }
448  }
449  else
450  {
451    if (!use_existing_connection)
452      WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
453    else
454      DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
455     
456    // cannot send data, so we must drop it
457    delete netmsg;
458
459  }
460
461  if (addr) delete addr;
462}
463
464/** sends a NetMsg to the network.
465 *
466 * @param netmsg message to send
467 * @param addr   transport endpoint address
468 *
469 * @note    if no connection exists, creates a new one
470 * @note    both parameters are deleted after the message was sent
471 */
472void
473TPoverTLS_TCP::tcptlssend(NetMsg* netmsg, appladdress* addr)
474{                       
475#ifndef _NO_LOGGING
476  const char *const thisproc="sender   - ";
477#endif
478
479  // set result initially to success, set it to failure
480  // in places where these failures occur
481  int result = TCP_SUCCESS;
482  int ret= 0;
483
484  // Create a new AssocData-pointer
485  AssocData* assoc = NULL;
486 
487  // tp.cpp checks for initialisation of tp and correctness of
488  // msgsize, protocol and ip,
489  // throws an error if something is not right
490  if (addr) {
491      addr->convert_to_ipv6();
492      check_send_args(*netmsg,*addr);
493 
494  } else {
495      Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "address pointer is NULL");
496      result= TCP_SEND_FAILURE;   
497      throw TPErrorInternal();
498  }
499   
500
501
502  // check for existing connections,
503  // if a connection exists, return its AssocData
504  // and save it in assoc for further use
505  // if no connection exists, create a new one (in get_connection_to()).
506  // Connection is inserted into connection map that must be done
507  // with exclusive access
508  assoc= get_connection_to(*addr);
509
510  if (assoc==NULL || assoc->socketfd<=0)
511  {
512    Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "no valid assoc/socket data");
513
514    delete netmsg;
515    delete addr;
516    return;
517  }
518  if (assoc->shutdown)
519  {
520    Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
521    delete netmsg;
522    delete addr;
523
524    throw TPErrorSendFailed();
525  }
526
527  uint32 msgsize= netmsg->get_size();
528#ifdef DEBUG_HARD
529  cerr << thisproc << "message size=" << netmsg->get_size() << endl;
530#endif
531
532
533  // get SSL connection data
534  SSL* ssl= sslmap[assoc->socketfd];
535
536
537  // send all the data contained in netmsg to the socket
538  // which belongs to the address "addr"
539  for (uint32 bytes_sent= 0;
540       bytes_sent < msgsize;
541       bytes_sent+= ret)
542  {
543
544#ifdef _DEBUG_HARD
545    for (uint32 i=0;i<msgsize;i++)
546    {
547      cout << "send_buf: " << i << " : "; 
548      if ( isalnum(*(netmsg->get_buffer()+i)) )
549        cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
550      else
551        cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
552      cout << endl;
553    }
554
555    cout << endl;
556    cout << "bytes_sent: " << bytes_sent << endl;
557    cout << "Message size: " <<  msgsize << endl;
558    cout << "Send-Socket: " << assoc->socketfd << endl;
559    cout << "pointer-Offset. " << netmsg->get_pos() << endl;
560    cout << "vor send " << endl;
561#endif
562
563
564    // socket send
565    //ret= ::send(assoc->socketfd,
566//              netmsg->get_buffer() + bytes_sent, 
567//              msgsize - bytes_sent,
568//              MSG_NOSIGNAL);
569
570
571    ret = SSL_write(ssl, netmsg->get_buffer() + bytes_sent, msgsize - bytes_sent);
572
573    //int SSL_write(SSL *ssl, const void *buf, int num);
574
575    //send_buf + bytes_sent
576
577    if (debug_pdu)
578    {
579      ostringstream hexdump;
580      netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
581      Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
582    }
583
584    if (ret < 0) 
585    {
586      result= TCP_SEND_FAILURE;
587      break;
588    } // end if (ret < 0)
589  } // end for
590
591  // *** note: netmsg is deleted here ***
592  delete netmsg;
593 
594  // Throwing an exception within a critical section does not
595  // unlock the mutex.
596
597  if (result != TCP_SUCCESS)
598  {
599    Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "TLS/TCP error, returns " << ret << ", error : " << strerror(errno));
600    delete addr;
601
602    throw TPErrorSendFailed();
603   
604  }
605  else
606    Log(EVENT_LOG,LOG_NORMAL,tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd  << " to " << *addr);
607
608  if (!assoc) {
609    // no connection
610   
611    Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str() 
612        << ", port #" << addr->get_port());
613
614    delete addr;
615
616    throw TPErrorUnreachable(); // should be no assoc found
617  } // end "if (!assoc)"
618 
619  // *** delete address ***
620  delete addr;
621}
622 
623/* this thread waits for an internal message that either:
624 * - requests transmission of a packet
625 * - requests to stop this thread
626 * @param argp points to the thread queue for internal messages
627 */
628void 
629TPoverTLS_TCP::sender_thread(void *argp)
630{
631#ifndef _NO_LOGGING
632  const char *const methodname="senderthread - ";
633#endif
634
635  message* internal_thread_msg = NULL;
636
637  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
638
639  FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
640  if (!fq)
641  {
642    Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
643    return;
644  }
645
646  bool terminate= false;
647  TPoverTLS_TCPMsg* internalmsg= 0;
648  while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
649  {
650    internalmsg= dynamic_cast<TPoverTLS_TCPMsg*>(internal_thread_msg);
651   
652    if (internalmsg == 0)
653    {
654      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "received not an TPoverTLS_TCPMsg but a" << internal_thread_msg->get_type_name());     
655    }
656    else
657    if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::send_data)
658    {
659      // create a connection if none exists and send the netmsg
660      if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
661      {
662        tcptlssend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
663      }
664      else
665      {
666        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "problem with passed arguments references, they point to 0");
667      } 
668    }
669    else
670    if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::stop)
671    {
672      terminate= true;
673    } 
674  } // end while
675 
676  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "<" << pthread_self() << "> terminated connection." );
677}
678
679
680/** receiver thread listens at a TCP socket for incoming PDUs
681 *  and passes complete PDUs to the coordinator. Incomplete
682 *  PDUs due to aborted connections or buffer overflows are discarded.
683 *
684 *  @param argp - assoc data and flags sig_terminate and terminated
685 * 
686 *  @note this is a static member function, so you cannot use class variables
687 */
688void 
689TPoverTLS_TCP::receiver_thread(void *argp)
690{
691#ifndef _NO_LOGGING
692  const char *const methodname="receiver - ";
693#endif
694  receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
695  const appladdress* peer_addr = NULL;
696  const appladdress* own_addr = NULL;
697  uint32 bytes_received = 0;
698  TPMsg* tpmsg= NULL;
699
700  //uchar* sslbuffer = NULL;
701 
702 
703  // argument parsing - start
704  if (receiver_thread_argp == 0)
705  {
706    Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
707
708    return;
709  }
710  else
711  {
712    // change status to running, i.e., not terminated
713    receiver_thread_argp->terminated= false;
714
715#ifdef _DEBUG
716    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
717#endif
718  }
719
720  int conn_socket= 0;
721  if (receiver_thread_argp->peer_assoc)
722  {
723    // get socket descriptor from arg
724    conn_socket = receiver_thread_argp->peer_assoc->socketfd;
725    // get pointer to peer address of socket (source/sender address of peer) from arg
726    peer_addr= &receiver_thread_argp->peer_assoc->peer;
727    own_addr=  &receiver_thread_argp->peer_assoc->ownaddr;
728  }
729  else
730  {
731    Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer assoc available - pointer is NULL");
732   
733    return;
734  }
735
736  if (peer_addr == 0)
737  {
738    Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
739   
740    return;
741  }
742  // argument parsing - end
743#ifdef _DEBUG
744  Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
745      "Preparing to wait for data at socket " 
746      << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
747#endif
748
749  int ret= 0;
750  uint32 msgcontentlength= 0;
751  bool msgcontentlength_known= false;
752  bool pdu_complete= false; // when to terminate inner loop
753
754  /* maybe use this to create a new pdu,
755    /// constructor
756    contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
757  */ 
758
759  // activate O_NON_BLOCK  for recv on socket conn_socket
760  fcntl(conn_socket,F_SETFL, O_NONBLOCK);
761   
762  // set options for polling
763  const unsigned int number_poll_sockets= 1; 
764  struct pollfd poll_fd;
765  // have to set structure before poll call
766  poll_fd.fd = conn_socket;
767  poll_fd.events = POLLIN | POLLPRI; 
768
769  int poll_status;
770  bool recv_error= false;
771
772  //get SSL handle
773  SSL *ssl=sslmap[conn_socket];
774
775  NetMsg* netmsg= 0;
776  NetMsg* remainbuf= 0;
777  uint32 buffer_bytes_left= 0;
778  size_t trailingbytes= 0;
779  bool skiprecv= false;
780  // loop until we receive a terminate signal (read-only var for this thread)
781  // or get an error from socket read
782  while( receiver_thread_argp->sig_terminate == false )
783  {
784    // Read next PDU from socket or process trailing bytes in remainbuf
785    ret= 0;
786    msgcontentlength= 0;
787    msgcontentlength_known= false;
788    pdu_complete= false;
789    netmsg= 0;
790
791    // there are trailing bytes left from the last receive call
792    if (remainbuf != 0)
793    {
794      netmsg= remainbuf;
795      remainbuf= 0;
796      buffer_bytes_left= netmsg->get_size()-trailingbytes;
797      bytes_received= trailingbytes;
798      trailingbytes= 0;
799      skiprecv= true;
800    }
801    else // no trailing bytes, create a new buffer
802    if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
803    {
804      buffer_bytes_left= netmsg->get_size();
805      bytes_received= 0;
806      skiprecv= false;
807    }
808    else
809    { // buffer allocation failed
810      bytes_received= 0;
811      buffer_bytes_left= 0;
812      recv_error= true;
813    }
814   
815    // loops until PDU is complete
816    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
817    while (!pdu_complete && 
818           !recv_error && 
819           !receiver_thread_argp->sig_terminate)
820    {
821      if (!skiprecv)
822      {
823        // read from TCP socket or return after sleep_time
824        poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
825       
826        if (receiver_thread_argp->sig_terminate)
827        {
828          Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
829          // disallow sending
830          AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
831          if (myassoc->shutdown == false)
832          {
833            myassoc->shutdown= true;
834            if (shutdown(myassoc->socketfd,SHUT_WR))
835            {
836              if ( errno != ENOTCONN )
837                Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
838            }
839          }
840          // try to read do a last read from the TCP socket or return after sleep_time
841          if (poll_status == 0)
842          {
843            poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
844          }
845        }
846
847        if (poll_fd.revents & POLLERR) // Error condition
848        {
849          if (errno == 0 || errno == EINTR)
850          {
851            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "poll(): " << strerror(errno));
852          }
853          else
854          {
855            Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
856            recv_error= true;
857          }
858        }
859       
860        if (poll_fd.revents & POLLHUP) // Hung up
861        {
862          Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
863          recv_error= true;
864        }
865       
866        if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
867        {
868          Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll Invalid request: fd not open");
869          recv_error= true;
870        }
871       
872        // check status (return value) of poll call
873        switch (poll_status)
874        {
875          case -1:
876            if (errno == 0 || errno == EINTR)
877            {
878              Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "Poll status: " << strerror(errno));
879            }
880            else
881            {
882              Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
883              recv_error= true;
884            }
885           
886            continue; // next while iteration
887            break;
888           
889          case 0:
890#ifdef DEBUG_HARD
891            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
892#endif
893            continue; // next while iteration
894            break;
895           
896          default:
897#ifdef DEBUG_HARD
898            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
899#endif
900            break;
901        } // end switch
902
903        /// spawn a new buffer for SSL decoding
904        //sslbuffer = new(nothrow) uchar[NetMsg::max_size];
905
906        /// receive data from socket buffer (recv will not block)
907        //ret = recv(conn_socket,
908        //         netmsg->get_buffer() + bytes_received,
909        //         buffer_bytes_left,
910        //         0);
911       
912        /// try to read from SSL
913        ret = SSL_read(ssl, netmsg->get_buffer() + bytes_received, buffer_bytes_left);
914
915
916        /// memcpy decoded contents back into netmsg
917        //memcpy(netmsg->get_buffer() + bytes_received, sslbuffer, buffer_bytes_left);
918
919        /// delete sslbuffer
920        //delete sslbuffer;
921
922        if ( ret < 0 )
923        {
924          if (errno!=EAGAIN && errno!=EWOULDBLOCK)
925          {
926            Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
927            recv_error= true;
928            continue;
929          }
930          else
931          { // errno==EAGAIN || errno==EWOULDBLOCK
932            // just nothing to read from socket, continue w/ next poll
933            continue;
934          }
935        }
936        else
937        {
938          if (ret == 0)
939          {
940            // this means that EOF is reached,
941            // other side has closed connection
942            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
943            // disallow sending
944            AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
945            if (myassoc->shutdown == false)
946            {
947              myassoc->shutdown= true;
948              if (shutdown(myassoc->socketfd,SHUT_WR))
949              {
950                if ( errno != ENOTCONN )
951                  Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
952              }
953            }
954            // not a real error, but we must quit the receive loop
955            recv_error= true;
956          }
957          else
958          {
959            Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
960            // track number of received bytes
961            bytes_received+= ret;
962            buffer_bytes_left-= ret;
963          }
964        }
965      } // end if do not skip recv() statement     
966
967      if (buffer_bytes_left < 0) ///< buffer space exhausted now
968      {
969        recv_error= true;
970        Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
971      }
972
973      if (!msgcontentlength_known) ///< common header not parsed
974      {
975       
976        // enough bytes read to parse common header?
977        if (bytes_received >= common_header_length)
978        {
979          // get message content length in number of bytes
980          if (getmsglength(*netmsg, msgcontentlength))
981            msgcontentlength_known= true;
982          else
983          {
984              Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
985
986            ostringstream hexdumpstr;
987            netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
988            Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"dumping received bytes:" << hexdumpstr.str());
989
990            // reset all counters
991            msgcontentlength= 0;
992            msgcontentlength_known= false;
993            bytes_received= 0;
994            pdu_complete= false;
995            continue;
996          }
997        }
998      } // endif common header not parsed
999
1000      // check whether we have read the whole Protocol PDU
1001      DLog(tpparam.name, "bytes_received - common_header_length: " << bytes_received-common_header_length << "msgcontentlength: " << msgcontentlength << ">=");
1002      if (msgcontentlength_known && bytes_received-common_header_length >= msgcontentlength )
1003      {
1004        pdu_complete= true;
1005
1006        // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1007        netmsg->truncate(common_header_length+msgcontentlength);
1008
1009        // trailing bytes are copied into new buffer
1010        if (bytes_received-common_header_length > msgcontentlength)
1011        {
1012          Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1013          remainbuf= new NetMsg(NetMsg::max_size);
1014          trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1015          bytes_received= common_header_length+msgcontentlength;
1016          memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1017        }
1018      }
1019    } // end while (!pdu_complete && !recv_error && !signalled for termination)
1020    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1021
1022    // if other side closed the connection, we should still be able to deliver the remaining data
1023    if (ret == 0)
1024    {
1025      recv_error= false;
1026    }
1027
1028    // deliver only complete PDUs to signaling module
1029    if (!recv_error && pdu_complete)
1030    {
1031      // create TPMsg and send it to the signaling thread
1032      tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1033
1034      Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "receipt of PDU now complete, sending to module " <<  message::get_qaddr_name(tpparam.dest));
1035
1036      debug_pdu=false;
1037
1038      if (debug_pdu)
1039      {
1040        ostringstream hexdump;
1041        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1042        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1043      }
1044
1045      // send the message if it was successfully created
1046      // bool message::send_to(qaddr_t dest, bool exp = false);
1047      if (!tpmsg
1048          || (!tpmsg->get_peeraddress())
1049          || (!tpmsg->send_to(tpparam.dest))) 
1050      {
1051        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1052        if (tpmsg) delete tpmsg;
1053      } // end if tpmsg not allocated or not addr or not sent
1054     
1055    } // end if !recv_error
1056    else
1057    { // error during receive or PDU incomplete
1058      if (bytes_received>0)
1059      {
1060        Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ?  "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1061      }
1062
1063      if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1064      {
1065        ostringstream hexdumpstr;
1066        netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1067        Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str()); 
1068      }
1069      // leave the outer loop
1070      /**********************/
1071      break;
1072      /**********************/
1073    } // end else
1074
1075  } // end while (thread not signalled for termination)
1076
1077  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() 
1078      << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1079
1080  // shutdown socket
1081  if (shutdown(conn_socket, SHUT_RD))
1082  {
1083    if ( errno != ENOTCONN )
1084      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1085  }
1086
1087  // close socket
1088  close(conn_socket);
1089
1090  receiver_thread_argp->terminated= true;
1091
1092  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1093
1094#ifdef _DEBUG
1095  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1096#endif
1097  // notify master thread for invocation of cleanup procedure
1098  TPoverTLS_TCPMsg* newmsg= new(nothrow)TPoverTLS_TCPMsg(receiver_thread_argp->peer_assoc);
1099  // send message to main loop thread
1100  newmsg->send_to(tpparam.source);
1101
1102
1103} 
1104
1105
1106/** this signals a terminate to a thread and wait for the thread to stop
1107 *  @note it is not safe to access any thread related data after this method returned,
1108 *        because the receiver thread will initiate a cleanup_receiver_thread() method
1109 *        which may erase all relevant thread data.
1110 */
1111void 
1112TPoverTLS_TCP::stop_receiver_thread(AssocData* peer_assoc)
1113{
1114  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1115  // after this procedure peer_assoc may be invalid because it was erased
1116
1117  // start critical section
1118
1119  if (peer_assoc == 0)
1120    return;
1121
1122  pthread_t thread_id=  peer_assoc->thread_ID;
1123 
1124  // try to clean up receiver_thread_arg
1125  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1126  receiver_thread_arg_t* recv_thread_argp= 
1127    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1128  if (recv_thread_argp)
1129  {
1130    if (!recv_thread_argp->terminated)
1131    {
1132      // thread signaled termination, but is not?
1133      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1134
1135      // signal thread for its termination
1136      recv_thread_argp->sig_terminate= true;
1137      // wait for thread to join after termination
1138      pthread_join(thread_id, 0);
1139      // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1140      return;
1141    }
1142  }
1143  else
1144    Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1145
1146}
1147
1148
1149/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1150 *  usually called by the master_thread after the receiver_thread terminated
1151 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1152 *       is still valid
1153 */
1154void 
1155TPoverTLS_TCP::cleanup_receiver_thread(AssocData* peer_assoc)
1156{
1157  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1158  // after this procedure peer_assoc may be invalid because it was erased
1159
1160  // start critical section
1161
1162  if (peer_assoc == 0)
1163    return;
1164
1165  pthread_t thread_id=  peer_assoc->thread_ID;
1166 
1167  // try to clean up receiver_thread_arg
1168  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1169  receiver_thread_arg_t* recv_thread_argp= 
1170    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1171  if (recv_thread_argp)
1172  {
1173    if (!recv_thread_argp->terminated)
1174    {
1175      // thread signaled termination, but is not?
1176      Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1177      return;
1178    }
1179    else
1180    { // if thread is already terminated
1181      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1182
1183      // delete it from receiver map
1184      recv_thread_argmap.erase(recv_thread_arg_iter);
1185
1186      // then delete receiver arg structure
1187      delete recv_thread_argp;
1188    }
1189  }
1190
1191  // delete entry from connection map
1192
1193  // cleanup sender thread
1194  // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1195  terminate_sender_thread(peer_assoc);
1196
1197  // delete the AssocData structure from the connection map
1198  // also frees allocated AssocData structure
1199  connmap.erase(peer_assoc);
1200
1201  // end critical section
1202
1203  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1204}
1205
1206
1207/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1208 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1209 *       is still valid, a lock is also required, because senderthread_queuemap is changed
1210 */
1211void 
1212TPoverTLS_TCP::terminate_sender_thread(const AssocData* assoc)
1213{
1214  if (assoc == 0)
1215  {
1216    Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1217    return;
1218  }
1219
1220  sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1221
1222  if (it != senderthread_queuemap.end())
1223  { // we have a sender thread: send a stop message to it
1224    FastQueue* destqueue= it->second; 
1225    if (destqueue)
1226    {
1227      TPoverTLS_TCPMsg* internalmsg= new TPoverTLS_TCPMsg(assoc,tpparam.source,TPoverTLS_TCPMsg::stop);
1228      if (internalmsg)
1229      {
1230        // send the internal message to the sender thread queue
1231        internalmsg->send(tpparam.source,destqueue);
1232      }
1233    }
1234    else
1235    {
1236      Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1237    }
1238    // erase entry from map
1239    senderthread_queuemap.erase(it);
1240  }
1241}
1242
1243/* terminate all active threads
1244 * note: locking should not be necessary here because this message is called as last method from
1245 * main_loop()
1246 */
1247void 
1248TPoverTLS_TCP::terminate_all_threads()
1249{
1250  AssocData* assoc= 0;
1251  receiver_thread_arg_t* terminate_argp;
1252
1253  for (recv_thread_argmap_t::iterator terminate_iterator=  recv_thread_argmap.begin();
1254       terminate_iterator !=  recv_thread_argmap.end();
1255       terminate_iterator++)
1256  {
1257    if ( (terminate_argp= terminate_iterator->second) != 0)
1258    {
1259      // we need a non const pointer to erase it later on
1260      assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1261      // check whether thread is still alive
1262      if (terminate_argp->terminated == false)
1263      {
1264        terminate_argp->sig_terminate= true;
1265        // then wait for its termination
1266        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1267            "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1268       
1269        pthread_join(terminate_iterator->first, 0);
1270       
1271        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first  << "> is terminated");
1272      }
1273      else
1274        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1275            "Receiver thread <" << terminate_iterator->first << "> already terminated");
1276       
1277      // cleanup all remaining argument structures of terminated threads
1278      delete terminate_argp;
1279
1280      // terminate any related sender thread that is still running
1281      terminate_sender_thread(assoc);
1282     
1283      connmap.erase(assoc);
1284      // delete assoc is not necessary, because connmap.erase() will do the job
1285    }
1286  } // end for
1287}
1288
1289
1290/**
1291 * sender thread starter:
1292 * just a static starter method to allow starting the
1293 * actual sender_thread() method.
1294 *
1295 * @param argp - pointer to the current TPoverTLS_TCP object instance and receiver_thread_arg_t struct
1296 */
1297void*
1298TPoverTLS_TCP::sender_thread_starter(void *argp)
1299{
1300  sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1301 
1302  // invoke sender thread method
1303  if (sargp != 0 && sargp->instance != 0)
1304  {
1305    // call receiver_thread member function on object instance
1306    sargp->instance->sender_thread(sargp->sender_thread_queue);
1307
1308    // no longer needed
1309    delete sargp;
1310  }
1311  else
1312  {
1313    Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1314  }
1315  return 0;
1316}
1317
1318
1319
1320
1321/**
1322 * receiver thread starter:
1323 * just a static starter method to allow starting the
1324 * actual receiver_thread() method.
1325 *
1326 * @param argp - pointer to the current TPoverTLS_TCP object instance and receiver_thread_arg_t struct
1327 */
1328void*
1329TPoverTLS_TCP::receiver_thread_starter(void *argp)
1330{
1331  receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1332  // invoke receiver thread method
1333  if (rargp != 0 && rargp->instance != 0)
1334  {
1335    // call receiver_thread member function on object instance
1336    rargp->instance->receiver_thread(rargp->rtargp);
1337
1338    // no longer needed
1339    delete rargp;
1340  }
1341  else
1342  {
1343    Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1344  }
1345  return 0;
1346}
1347
1348
1349void
1350TPoverTLS_TCP::create_new_sender_thread(FastQueue* senderfqueue)
1351{
1352  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1353
1354  pthread_t senderthreadid;
1355  // create new thread; (arg == 0) is handled by thread, too
1356  int pthread_status= pthread_create(&senderthreadid, 
1357                                     NULL, // NULL: default attributes: thread is joinable and has a
1358                                     //       default, non-realtime scheduling policy
1359                                     TPoverTLS_TCP::sender_thread_starter,
1360                                     new sender_thread_start_arg_t(this,senderfqueue));
1361  if (pthread_status)
1362  {
1363    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1364   
1365    delete senderfqueue;
1366  }
1367}
1368
1369
1370void
1371TPoverTLS_TCP::create_new_receiver_thread(AssocData* peer_assoc)
1372{
1373  receiver_thread_arg_t* argp= 
1374    new(nothrow) receiver_thread_arg(peer_assoc);
1375 
1376  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1377
1378  // create new thread; (arg == 0) is handled by thread, too
1379  int pthread_status= pthread_create(&peer_assoc->thread_ID, 
1380                                     NULL, // NULL: default attributes: thread is joinable and has a
1381                                     //       default, non-realtime scheduling policy
1382                                     receiver_thread_starter,
1383                                     new(nothrow) receiver_thread_start_arg_t(this,argp));
1384  if (pthread_status)
1385  {
1386    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1387   
1388    delete argp;
1389  }
1390  else
1391  {
1392    lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
1393
1394    // remember pointer to thread arg structure
1395    // thread arg structure should be destroyed after thread termination only
1396    pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1397      recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1398    if (tmpinsiterator.second == false)
1399    {
1400      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1401    }
1402    unlock(); // uninstall_cleanup(1);
1403  } 
1404}
1405
1406
1407/**
1408 * master listener thread starter:
1409 * just a static starter method to allow starting the
1410 * actual master_listener_thread() method.
1411 *
1412 * @param argp - pointer to the current TPoverTLS_TCP object instance
1413 */
1414void*
1415TPoverTLS_TCP::master_listener_thread_starter(void *argp)
1416{
1417  // invoke listener thread method
1418  if (argp != 0)
1419  {
1420    (static_cast<TPoverTLS_TCP*>(argp))->master_listener_thread();
1421  }
1422  return 0;
1423}
1424
1425
1426int 
1427verify_callback(int ok, X509_STORE_CTX * store)
1428{
1429  if (!ok) {
1430    ERRLog("TPoverTLS", color[red] << "Client cert check failed" << color[off]);
1431  } else {
1432    DLog("TPoverTLS", color[green] << "Client cert check OK" << color[off]);
1433  };
1434  return ok;
1435}
1436
1437
1438/**
1439 * master listener thread: waits for incoming connections at the well-known tcp port
1440 * when a connection request is received this thread spawns a receiver_thread for
1441 * receiving packets from the peer at the new socket.
1442 */
1443void
1444TPoverTLS_TCP::master_listener_thread()
1445{
1446  // create a new address-structure for the listening masterthread
1447  struct sockaddr_in6 own_address;
1448  own_address.sin6_family = AF_INET6;
1449  own_address.sin6_flowinfo= 0;
1450  own_address.sin6_port = htons(tpparam.port); // use port number in param structure
1451  // accept incoming connections on all interfaces
1452  own_address.sin6_addr = in6addr_any;
1453  own_address.sin6_scope_id= 0;
1454 
1455  // create a listening socket
1456  int master_listener_socket= socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1457  if (master_listener_socket == -1)
1458  {
1459    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1460    return;
1461  }
1462 
1463  // Disable Nagle Algorithm, set (TCP_NODELAY)
1464  int nodelayflag= 1;
1465  int status= setsockopt(master_listener_socket,
1466                         IPPROTO_TCP,
1467                         TCP_NODELAY,
1468                         &nodelayflag,
1469                         sizeof(nodelayflag));
1470  if (status)
1471  {
1472    Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1473  }
1474 
1475  // Reuse ports, even if they are busy
1476  int socketreuseflag= 1;
1477  status= setsockopt(master_listener_socket,
1478                           SOL_SOCKET,
1479                           SO_REUSEADDR,
1480                           (const char *) &socketreuseflag,
1481                           sizeof(socketreuseflag));
1482  if (status)
1483  {
1484       Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1485  }
1486 
1487 
1488  // bind the newly created socket to a specific address
1489  int bind_status = bind(master_listener_socket,
1490                         reinterpret_cast<struct sockaddr *>(&own_address),
1491                         sizeof(own_address));
1492  if (bind_status)
1493    { 
1494      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " 
1495          << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_tls, INET6_ADDRSTRLEN)
1496          << " port " << tpparam.port << " failed, error: " << strerror(errno));
1497      return;
1498    }
1499
1500    // listen at the socket,
1501    // queuesize for pending connections= max_listen_queue_size
1502    int listen_status = listen(master_listener_socket, max_listen_queue_size);
1503    if (listen_status)
1504    {
1505      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1506          << " failed, error: " << strerror(errno));
1507      return;
1508    }
1509    else
1510    {
1511      Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1512    }
1513
1514    // activate O_NON_BLOCK for accept (accept does not block)
1515    fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1516
1517    // create a pollfd struct for use in the mainloop
1518    struct pollfd poll_fd;
1519    poll_fd.fd = master_listener_socket;
1520    poll_fd.events = POLLIN | POLLPRI; 
1521    /*
1522      #define POLLIN    0x001   // There is data to read.
1523      #define POLLPRI   0x002   // There is urgent data to read. 
1524      #define POLLOUT   0x004   // Writing now will not block. 
1525    */
1526   
1527    bool terminate = false;
1528    // check for thread terminate condition using get_state()
1529    state_t currstate= get_state();
1530    int poll_status= 0;
1531    const unsigned int number_poll_sockets= 1; 
1532    struct sockaddr_in6 peer_address;
1533    socklen_t peer_address_len;
1534    int conn_socket;
1535
1536    // check whether this thread is signaled for termination
1537    while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1538    {
1539      // wait on number_poll_sockets (main drm socket)
1540      // for the events specified above for sleep_time (in ms)
1541      poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1542      if (poll_fd.revents & POLLERR) // Error condition
1543      {
1544        if (errno != EINTR) 
1545        {
1546          Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1547              "Poll caused error " << strerror(errno) << " - indicated by revents");
1548        }
1549        else
1550        {
1551          Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1552        }
1553
1554      }
1555      if (poll_fd.revents & POLLHUP) // Hung up
1556      {
1557        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1558        return;
1559      }
1560      if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1561      {
1562        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1563        return;
1564      }
1565     
1566      switch (poll_status)
1567      {
1568        case -1:
1569          if (errno != EINTR)
1570          {
1571            Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1572          }
1573          else
1574          {
1575            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1576          }
1577           
1578          break;
1579
1580        case 0:
1581#ifdef DEBUG_HARD
1582          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, 
1583              "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1584#endif
1585          currstate= get_state();
1586          continue;
1587          break;
1588
1589        default:
1590#ifdef DEBUG_HARD
1591          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1592#endif
1593          break;
1594      } // end switch
1595
1596      // after a successful accept call,
1597      // accept stores the address information of the connecting party
1598      // in peer_address and the size of its address in addrlen
1599      peer_address_len= sizeof(peer_address);
1600
1601      conn_socket = accept (master_listener_socket,
1602                            reinterpret_cast<struct sockaddr *>(&peer_address),
1603                            &peer_address_len);
1604
1605
1606      if (conn_socket > -1) {
1607
1608          // initialise SSL handshake, save ssl pointer in sslmap
1609         
1610          SSL_CTX *ssl_ctx_server = SSL_CTX_new(TLSv1_server_method());
1611         
1612          // Load client certificate
1613          if (SSL_CTX_use_certificate_chain_file(ssl_ctx_server, "client_cert.pem") != 1) {
1614              ERRCLog(tpparam.name, "Unable to load client certificate");
1615              this->abort_processing();
1616             
1617          }
1618         
1619          // Load private key
1620          if (SSL_CTX_use_PrivateKey_file(ssl_ctx_server, "client_privkey.pem", SSL_FILETYPE_PEM) != 1) {
1621            ERRCLog(tpparam.name, "Unable to load private Key, reason:" << SSLerrmessage());
1622              this->abort_processing();
1623          }
1624         
1625          // Verify private key
1626          if (SSL_CTX_load_verify_locations(ssl_ctx_server, "root_cert.pem", ".") != 1) {
1627              ERRCLog(tpparam.name, "Private Key failed verification against CA");
1628              this->abort_processing();
1629          }
1630         
1631         
1632         
1633         
1634          // We trust clients from this CA
1635          SSL_CTX_set_client_CA_list(ssl_ctx_server, SSL_load_client_CA_file("root_cert.pem"));
1636         
1637          // Enable (optional) client authentication
1638          SSL_CTX_set_verify(ssl_ctx_server, SSL_VERIFY_PEER, verify_callback);
1639          SSL_CTX_set_verify_depth(ssl_ctx_server, 4);
1640         
1641          SSL *ssl = SSL_new(ssl_ctx_server);
1642         
1643          SSL_set_fd(ssl, conn_socket);
1644         
1645          int result = SSL_accept(ssl);
1646         
1647          if (result==-1) ERRCLog(tpparam.name, "SSL connect failed");
1648         
1649          sslmap[conn_socket]=ssl;
1650         
1651          DLog(tpparam.name, "SSL handle saved for future use with that socket");
1652         
1653      }
1654
1655      if (conn_socket == -1)
1656      {
1657        if (errno != EWOULDBLOCK && errno != EAGAIN)
1658        {
1659          Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1660              << " failed, error: " << strerror(errno));
1661          return;
1662        }
1663      }
1664      else
1665      {
1666        // create a new assocdata-object for the new thread
1667        AssocData* peer_assoc = NULL;
1668        appladdress addr(peer_address, IPPROTO_TCP);
1669
1670        addr.set_protocol(prot_tls_tcp);
1671
1672        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str() 
1673            << " port #" << addr.get_port());
1674
1675        struct sockaddr_in6 own_address;
1676        socklen_t own_address_len= sizeof(own_address);
1677        getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1678
1679        // AssocData will copy addr content into its own structure
1680        // allocated peer_assoc will be stored in connmap
1681        peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1682
1683        bool insert_success= false;
1684        if (peer_assoc)
1685        {
1686          // start critical section
1687          lock(); // install_cleanup_thread_lock(TPoverTLS_TCP, this);
1688          insert_success= connmap.insert(peer_assoc);
1689
1690          // end critical section
1691          unlock(); // uninstall_cleanup(1);
1692        }
1693       
1694
1695
1696       
1697        if (insert_success == false) // not inserted into connmap
1698        {
1699          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1700              << ", " << addr.get_ip_str() << ", port #" 
1701              << addr.get_port() << " into connection map, aborting connection...");
1702
1703          // abort connection, delete its AssocData
1704          close (conn_socket);
1705          if (peer_assoc) 
1706          { 
1707            delete peer_assoc;
1708            peer_assoc= 0;
1709          }
1710          return;
1711               
1712        } //end __else(connmap.insert());__
1713       
1714        // create a new thread for each new connection
1715        create_new_receiver_thread(peer_assoc);
1716      } // end __else (connsocket)__
1717     
1718      // get new thread state
1719      currstate= get_state();
1720
1721    } // end while(!terminate)
1722    return;
1723} // end listen_for_connections()   
1724
1725
1726TPoverTLS_TCP::~TPoverTLS_TCP()
1727{
1728  init= false;
1729
1730  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,  "Destructor called");
1731
1732  QueueManager::instance()->unregister_queue(tpparam.source);
1733}
1734
1735/** TPoverTLS_TCP Thread main loop.
1736 * This loop checks for internal messages of either
1737 * a send() call to start a new receiver thread, or,
1738 * of a receiver_thread() that signals its own termination
1739 * for proper cleanup of control structures.
1740 * It also handles the following internal TPoverTLS_TCPMsg types:
1741 * - TPoverTLS_TCPMsg::stop - a particular receiver thread is terminated
1742 * - TPoverTLS_TCPMsg::start - a particular receiver thread is started
1743 * @param nr number of current thread instance
1744 */
1745void 
1746TPoverTLS_TCP::main_loop(uint32 nr)
1747{
1748    SSL_library_init();
1749    OpenSSL_add_ssl_algorithms();
1750    SSL_load_error_strings();
1751
1752    ssl_ctx=SSL_CTX_new(TLSv1_client_method());
1753
1754    if (!ssl_ctx)
1755    {
1756      ERRCLog(tpparam.name, "could not create SSL context: "<< SSLerrmessage());
1757    }
1758
1759    // Load client certificate
1760    if (SSL_CTX_use_certificate_chain_file(ssl_ctx, tpparam.client_cert_filename) != 1) {
1761        ERRCLog(tpparam.name, "Unable to load client certificate: " << SSLerrmessage());
1762    }
1763    else
1764    {
1765      DLog(tpparam.name, color[green] << "Client certificate successfully loaded from " <<  tpparam.client_cert_filename << color[off]); 
1766    }
1767   
1768    // Load private key
1769    if (SSL_CTX_use_PrivateKey_file(ssl_ctx, tpparam.client_privkey_filename, SSL_FILETYPE_PEM) != 1) {
1770        ERRCLog(tpparam.name, "Unable to load private Key: " << SSLerrmessage());
1771    }
1772    else
1773    {
1774      DLog(tpparam.name, color[green] << "Private key successfully loaded from " <<  tpparam.client_privkey_filename << color[off]); 
1775    }
1776   
1777    // Verify private key
1778    if (SSL_CTX_load_verify_locations(ssl_ctx, tpparam.root_cert_filename, ".") != 1) {
1779        ERRCLog(tpparam.name, "Private Key failed verification against CA");
1780    }
1781    else
1782    {
1783      DLog(tpparam.name, color[green] << "Private key successfully verified as being certified by our CA " <<  tpparam.root_cert_filename << color[off]); 
1784    }
1785
1786
1787  // get internal queue for messages from receiver_thread
1788  FastQueue* fq = get_fqueue();
1789  if (!fq) 
1790  {
1791    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1792    return;
1793  } // end if not fq
1794  // register queue for receiving internal messages from other modules
1795  QueueManager::instance()->register_queue(fq,tpparam.source);
1796
1797
1798
1799  // start master listener thread
1800  pthread_t master_listener_thread_ID;
1801  int pthread_status= pthread_create(&master_listener_thread_ID, 
1802                                     NULL, // NULL: default attributes: thread is joinable and has a
1803                                     //       default, non-realtime scheduling policy
1804                                     master_listener_thread_starter,
1805                                     this);
1806  if (pthread_status)
1807  {
1808    Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1809        "New master listener thread could not be created: " <<  strerror(pthread_status));
1810  }
1811  else
1812    DLog(tpparam.name, "Master listener thread started");
1813
1814
1815  // define max latency for thread reaction on termination/stop signal
1816  timespec wait_interval= { 0, 250000000L }; // 250ms
1817  message* internal_thread_msg = NULL;
1818  state_t currstate= get_state();
1819
1820  // check whether this thread is signaled for termination
1821  while( currstate!=STATE_ABORT && currstate!=STATE_STOP ) 
1822  {
1823    // poll internal message queue (blocking)
1824    if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1825    {
1826      TPoverTLS_TCPMsg* internalmsg= dynamic_cast<TPoverTLS_TCPMsg*>(internal_thread_msg);
1827      if (internalmsg)
1828      {
1829        if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::stop)
1830        {
1831          // a receiver thread terminated and signaled for cleanup by master thread
1832          AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1833          Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1834          lock();
1835          cleanup_receiver_thread( assocd );
1836          unlock();
1837        }
1838        else
1839        if (internalmsg->get_msgtype() == TPoverTLS_TCPMsg::start)
1840        {
1841          // start a new receiver thread
1842          create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1843        }
1844        else
1845          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1846         
1847        delete internalmsg;
1848      }
1849      else
1850      {
1851        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1852            << internal_thread_msg->get_source());
1853      }
1854    } // endif
1855
1856    // get thread state
1857    currstate= get_state();
1858  } // end while
1859
1860  if (currstate==STATE_STOP)
1861  {
1862    // start abort actions
1863    Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1864  } // end if stopped
1865
1866  // do not accept any more messages
1867  fq->shutdown();
1868  // terminate all receiver and sender threads that are still active
1869  terminate_all_threads();
1870}
1871
1872} // end namespace protlib
1873///@}
Note: See TracBrowser for help on using the repository browser.