An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.cpp @ 7038

Last change on this file since 7038 was 7038, checked in by mies, 9 years ago

updated protlib to work with v4 only

File size: 55.4 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29 
30extern "C"
31{
32  //#define _SOCKADDR_LEN    /* use BSD 4.4 sockets */
33#include <unistd.h>      /* gethostname */
34#include <sys/types.h>   /* network socket interface */
35#include <netinet/in.h>  /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h>   /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63#define IPV6_ADDR_INT32_SMP 0x0000ffff
64
65namespace protlib {
66
67void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6)       {
68        bzero(sin, sizeof(*sin));
69        sin->sin_family = AF_INET;
70        sin->sin_port = sin6->sin6_port;
71        memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
72}
73
74/* Convert sockaddr_in to sockaddr_in6 in v4 mapped addr format. */
75void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
76        bzero(sin6, sizeof(*sin6));
77        sin6->sin6_family = AF_INET6;
78        sin6->sin6_port = sin->sin_port;
79        *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
80        *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
81        *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
82        *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
83}
84
85using namespace log;
86
87/** @defgroup tptcp TP over TCP
88 * @ingroup network
89 * @{
90 */
91
92char in6_addrstr[INET6_ADDRSTRLEN+1];
93
94
95/******* class TPoverTCP *******/
96
97
98/** get_connection_to() checks for already existing connections.
99 *  If a connection exists, it returns "AssocData"
100 *  and saves it in "connmap" for further use
101 *  If no connection exists, a new connection to "addr"
102 *  is created.
103 */
104AssocData* 
105TPoverTCP::get_connection_to(const appladdress& addr)
106{
107  // get timeout
108  struct timespec ts;
109  get_time_of_day(ts);
110  ts.tv_nsec+= tpparam.sleep_time * 1000000L;
111  if (ts.tv_nsec>=1000000000L)
112  {
113    ts.tv_sec += ts.tv_nsec / 1000000000L;
114    ts.tv_nsec= ts.tv_nsec % 1000000000L;
115  }
116
117  // create a new AssocData pointer, initialize it to NULL
118  AssocData* assoc= NULL;
119  int new_socket;
120  // loop until timeout is exceeded: TODO: check applicability of loop
121  do 
122  {
123    // check for existing connections to addr
124    // start critical section
125    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
126    assoc= connmap.lookup(addr);
127    // end critical section
128    unlock(); // uninstall_cleanup(1);
129    if (assoc) 
130    {
131      // If not shut down then use it, else abort, wait and check again.
132      if (!assoc->shutdown) 
133      {
134        return assoc;
135      }
136      else
137      {
138        // TODO: connection is already in shutdown mode. What now?
139        ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
140
141        return 0;
142      } 
143    } //end __if (assoc)__
144    else 
145    {
146      Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to " 
147          << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
148    }
149
150    // no connection found, create a new one
151    new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
152    if (new_socket == -1)
153    {
154      ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
155
156      return 0;
157    }
158
159    // Disable Nagle Algorithm, set (TCP_NODELAY)
160    int nodelayflag= 1;
161    int status= setsockopt(new_socket,
162                           IPPROTO_TCP,
163                           TCP_NODELAY,
164                           &nodelayflag,
165                           sizeof(nodelayflag));
166    if (status)
167    {
168        ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
169    }
170   
171    // Reuse ports, even if they are busy
172    int socketreuseflag= 1;
173    status= setsockopt(new_socket,
174                           SOL_SOCKET,
175                           SO_REUSEADDR,
176                           (const char *) &socketreuseflag,
177                           sizeof(socketreuseflag));
178    if (status)
179    {
180        ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
181    }
182
183        struct sockaddr_in6 dest_address;
184        dest_address.sin6_flowinfo= 0;
185        dest_address.sin6_scope_id= 0;
186        addr.get_sockaddr(dest_address);
187
188        // connect the socket to the destination address
189        int connect_status = 0;
190        if (v4_mode) {
191                struct sockaddr_in dest_address_v4;
192                v6_to_v4( &dest_address_v4, &dest_address );
193                connect_status = connect(new_socket,
194                                         reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
195                                         sizeof(dest_address));
196        } else {
197                connect_status = connect(new_socket,
198                                         reinterpret_cast<const struct sockaddr*>(&dest_address),
199                                         sizeof(dest_address));
200        }
201     
202    // connects to the listening_port of the peer's masterthread   
203    if (connect_status != 0)
204    {
205      ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port() 
206          << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
207     
208      return 0; // error: couldn't connect
209    }
210
211
212    struct sockaddr_in6 own_address;
213    socklen_t own_address_len= sizeof(own_address);
214        getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
215
216    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port() 
217        << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN) 
218        << " port #" << ntohs(own_address.sin6_port));
219
220   
221    // create new AssocData record (will copy addr)
222    assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
223
224    // if assoc could be successfully created, insert it into ConnectionMap
225    if (assoc) 
226    {
227      bool insert_success= false;
228      // start critical section
229      lock(); // install_cleanup_thread_lock(TPoverTCP, this);
230      // insert AssocData into connection map
231      insert_success= connmap.insert(assoc);
232      // end critical section
233      unlock(); // uninstall_cleanup(1);
234
235      if (insert_success == true) 
236      {
237#ifdef _DEBUG
238        Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
239            << " via socket " << new_socket);
240
241               
242#endif
243
244        // notify master thread to start a receiver thread (i.e. send selfmessage)
245        TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
246        if (newmsg)
247        {
248          newmsg->send_to(tpparam.source);
249          return assoc;
250        }
251        else
252          ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
253      } 
254      else 
255      {
256        // delete data and abort
257        ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
258            <<", port #" << addr.get_port() << " into connection map, aborting connection");
259               
260        // abort connection, delete its AssocData
261        close (new_socket);
262        if (assoc) 
263        { 
264          delete assoc; 
265          assoc= 0;
266        }
267        return assoc;
268      } // end else connmap.insert                     
269     
270    } // end "if (assoc)"
271  } 
272  while (wait_cond(ts)!=ETIMEDOUT);
273
274  return assoc;
275} //end get_connection_to
276
277
278/** terminates a signaling association/connection
279 *  - if no connection exists, generate a warning
280 *  - otherwise: generate internal msg to related receiver thread
281 */
282void
283TPoverTCP::terminate(const address& in_addr)
284{
285#ifndef _NO_LOGGING
286 const char *const thisproc="terminate()   - ";
287#endif
288
289 appladdress* addr = NULL;
290 addr = dynamic_cast<appladdress*>(in_addr.copy());
291
292 if (!addr) return;
293
294  // Create a new AssocData-pointer
295  AssocData* assoc = NULL;
296
297  // check for existing connections to addr
298
299  // start critical section:
300  // please note if receiver_thread terminated already, the assoc data is not
301  // available anymore therefore we need a lock around cleanup_receiver_thread()
302
303  lock(); // install_cleanup_thread_lock(TPoverTCP, this);
304  assoc= connmap.lookup(*addr);
305  if (assoc) 
306  {
307    EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
308    // If not shut down then use it, else abort, wait and check again.
309    if (!assoc->shutdown) 
310    {
311      if (assoc->socketfd)
312      {
313        // disallow sending
314        if (shutdown(assoc->socketfd,SHUT_WR))
315        {
316          ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
317        }
318        else
319        {
320          EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );       
321        }
322      }
323      assoc->shutdown= true;
324    }
325    else
326      EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
327     
328  }
329  else
330    WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
331
332  stop_receiver_thread(assoc);
333
334  // end critical section
335  unlock(); // uninstall_cleanup(1);
336
337  if (addr) delete addr;
338}
339
340
341/** generates and internal TPoverTCP message to send a NetMsg to the network
342 *  - it is necessary to let a thread do this, because the caller
343 *     may get blocked if the connect() or send() call hangs for a while
344 *  - the sending thread will call TPoverTCP::tcpsend()
345 *  - if no connection exists, creates a new one (unless use_existing_connection is true)
346 *  @note the netmsg is deleted by the send() method when it is not used anymore
347 */
348void
349TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
350{
351  if (netmsg == NULL) {
352    ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
353    return;
354  }
355
356  appladdress* addr = NULL;
357  addr= dynamic_cast<appladdress*>(in_addr.copy());
358   
359  if (!addr)
360  {
361    ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
362    return;
363  } 
364
365  // lock due to sendermap access
366  lock();
367   
368  // check for existing sender thread
369  sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
370   
371  FastQueue* destqueue= 0;
372
373  if (it == senderthread_queuemap.end())
374  { // no sender thread found so far
375
376    // if we already have an existing connection it is save to create a sender thread
377    // since get_connection_to() will not be invoked, so an existing connection will
378    // be used
379    const AssocData* assoc = connmap.lookup(*addr);
380
381    //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
382
383    if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
384    { // it is allowed to create a new connection for this thread
385      // create a new queue for sender thread
386      FastQueue* sender_thread_queue= new FastQueue;
387      create_new_sender_thread(sender_thread_queue);
388      // remember queue for later use
389     
390      //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
391      senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
392     
393      destqueue= sender_thread_queue;
394    }
395  }
396  else
397  { // we have a sender thread, use stored queue from map
398    destqueue= it->second; 
399  }
400   
401  unlock();
402   
403  // send a send_data message to it (if we have a destination queue)
404  if (destqueue)
405  {
406    // both parameters will be freed after message was sent!
407   
408    TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
409    if (internalmsg)
410    {
411      // send the internal message to the sender thread queue
412      internalmsg->send(tpparam.source,destqueue);
413    }
414  }
415  else
416  {
417    if (!use_existing_connection)
418      WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
419    else
420    {
421      DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
422
423    }
424    // cannot send data, so we must drop it
425    delete netmsg;
426  }
427 
428  if (addr) delete addr;
429}
430
431/** sends a NetMsg to the network.
432 *
433 * @param netmsg message to send
434 * @param addr   transport endpoint address
435 *
436 * @note   if no connection exists, creates a new one
437 * @note   both parameters are deleted after the message was sent
438 */
439void
440TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
441{                       
442#ifndef _NO_LOGGING
443  const char *const thisproc="sender   - ";
444#endif
445
446  // set result initially to success, set it to failure
447  // in places where these failures occur
448  int result = TCP_SUCCESS;
449  int saved_errno= 0;
450  int ret= 0;
451
452  // Create a new AssocData-pointer
453  AssocData* assoc = NULL;
454 
455  // tp.cpp checks for initialisation of tp and correctness of
456  // msgsize, protocol and ip,
457  // throws an error if something is not right
458  if (addr) {
459    addr->convert_to_ipv6();
460    check_send_args(*netmsg,*addr);
461  } 
462  else 
463  {
464    ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
465    result= TCP_SEND_FAILURE;
466
467    delete netmsg;
468    delete addr;
469
470    throw TPErrorInternal();
471  }
472
473
474  // check for existing connections,
475  // if a connection exists, return its AssocData
476  // and save it in assoc for further use
477  // if no connection exists, create a new one (in get_connection_to()).
478  // Connection is inserted into connection map that must be done
479  // with exclusive access
480  assoc= get_connection_to(*addr);
481
482  if (assoc==NULL || assoc->socketfd<=0)
483  {
484    ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
485
486    delete netmsg;
487    delete addr;
488    return;
489  }
490
491  if (assoc->shutdown)
492  {
493    Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
494    delete netmsg;
495    delete addr;
496
497    throw TPErrorSendFailed();
498  }
499
500  uint32 msgsize= netmsg->get_size();
501#ifdef DEBUG_HARD
502  cerr << thisproc << "message size=" << netmsg->get_size() << endl;
503#endif
504
505  const uint32 retry_send_max = 3;
506  uint32 retry_count = 0;
507  // send all the data contained in netmsg to the socket
508  // which belongs to the address "addr"
509  for (uint32 bytes_sent= 0;
510       bytes_sent < msgsize;
511       bytes_sent+= ret)
512  {
513
514#ifdef _DEBUG_HARD
515    for (uint32 i=0;i<msgsize;i++)
516    {
517      cout << "send_buf: " << i << " : "; 
518      if ( isalnum(*(netmsg->get_buffer()+i)) )
519        cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
520      else
521        cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
522      cout << endl;
523    }
524
525    cout << endl;
526    cout << "bytes_sent: " << bytes_sent << endl;
527    cout << "Message size: " <<  msgsize << endl;
528    cout << "Send-Socket: " << assoc->socketfd << endl;
529    cout << "pointer-Offset. " << netmsg->get_pos() << endl;
530    cout << "vor send " << endl;
531#endif
532
533    retry_count= 0;
534    do 
535    {
536      // socket send
537      ret= ::send(assoc->socketfd, 
538                  netmsg->get_buffer() + bytes_sent, 
539                  msgsize - bytes_sent, 
540                  MSG_NOSIGNAL);
541     
542      // send_buf + bytes_sent
543
544      // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
545      // retry sending
546      if (ret < 0)
547      { // internal error during send occured
548        saved_errno= errno;
549        switch(saved_errno)
550        {
551          case EAGAIN:  // same as EWOULDBLOCK
552          case EINTR:   // man page says: A signal occurred before any data was transmitted.
553          case ENOBUFS: //  The output queue for a network interface was full.
554            retry_count++;
555            ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
556                   << " - retry sending, retry #" << retry_count);
557            // pace down a little bit, sleep for a while
558            sleep(1);
559            break;
560
561            // everything else should not lead to repetition
562          default:
563            retry_count= retry_send_max;
564            break;
565        } // end switch
566      } // endif
567      else // leave while
568        break;
569    } 
570    while(retry_count < retry_send_max);
571
572    if (ret < 0) 
573    { // unrecoverable error occured
574     
575      result= TCP_SEND_FAILURE;
576      break;
577    } // end if (ret < 0)
578    else
579    {
580      if (debug_pdu)
581      {
582        ostringstream hexdump;
583        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
584        DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
585      }
586    }
587
588    // continue with send next data block in next for() iteration
589  } // end for
590
591  // *** note: netmsg is deleted here ***
592  delete netmsg;
593 
594  // Throwing an exception within a critical section does not
595  // unlock the mutex.
596
597  if (result != TCP_SUCCESS)
598  {
599    ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
600    delete addr;
601
602    throw TPErrorSendFailed(saved_errno);
603   
604  }
605  else
606    EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd  << " to " << *addr);
607
608  if (!assoc) {
609    // no connection
610   
611    ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str() 
612        << ", port #" << addr->get_port());
613
614    delete addr;
615
616    throw TPErrorUnreachable(); // should be no assoc found
617  } // end "if (!assoc)"
618 
619  // *** delete address ***
620  delete addr;
621}
622 
623/* this thread waits for an internal message that either:
624 * - requests transmission of a packet
625 * - requests to stop this thread
626 * @param argp points to the thread queue for internal messages
627 */
628void 
629TPoverTCP::sender_thread(void *argp)
630{
631#ifndef _NO_LOGGING
632  const char *const methodname="senderthread - ";
633#endif
634
635  message* internal_thread_msg = NULL;
636
637  EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
638
639  FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
640  if (!fq)
641  {
642    ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
643    return;
644  }
645
646  bool terminate= false;
647  TPoverTCPMsg* internalmsg= 0;
648  while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
649  {
650    internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
651   
652    if (internalmsg == 0)
653    {
654      ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());     
655    }
656    else
657    if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
658    {
659      // create a connection if none exists and send the netmsg
660      if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
661      {
662        try 
663        {
664          tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
665        } // end try
666        catch(TPErrorSendFailed& err)
667        {
668          ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what() 
669                 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
670        } // end catch
671        catch(TPError& err)
672        {
673          ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
674        } // end catch
675        catch(...)
676        {
677          ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
678        }
679      }
680      else
681      {
682        ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
683      } 
684    }
685    else
686    if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
687    {
688      terminate= true;
689    } 
690  } // end while
691 
692  EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
693}
694
695
696/** receiver thread listens at a TCP socket for incoming PDUs
697 *  and passes complete PDUs to the coordinator. Incomplete
698 *  PDUs due to aborted connections or buffer overflows are discarded.
699 *  @param argp - assoc data and flags sig_terminate and terminated
700 * 
701 *  @note this is a static member function, so you cannot use class variables
702 */
703void 
704TPoverTCP::receiver_thread(void *argp)
705{
706#ifndef _NO_LOGGING
707  const char *const methodname="receiver - ";
708#endif
709
710  receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
711  const appladdress* peer_addr = NULL;
712  const appladdress* own_addr = NULL;
713  uint32 bytes_received = 0;
714  TPMsg* tpmsg= NULL;
715 
716  // argument parsing - start
717  if (receiver_thread_argp == 0)
718  {
719    ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
720
721    return;
722  }
723  else
724  {
725    // change status to running, i.e., not terminated
726    receiver_thread_argp->terminated= false;
727
728#ifdef _DEBUG
729    DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
730#endif
731  }
732
733  int conn_socket= 0;
734  if (receiver_thread_argp->peer_assoc)
735  {
736    // get socket descriptor from arg
737    conn_socket = receiver_thread_argp->peer_assoc->socketfd;
738    // get pointer to peer address of socket (source/sender address of peer) from arg
739    peer_addr= &receiver_thread_argp->peer_assoc->peer;
740    own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
741  }
742  else
743  {
744    ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
745   
746    return;
747  }
748
749  if (peer_addr == 0)
750  {
751    ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
752   
753    return;
754  }
755  // argument parsing - end
756#ifdef _DEBUG
757  Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
758      "Preparing to wait for data at socket " 
759      << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
760#endif
761
762  int ret= 0;
763  uint32 msgcontentlength= 0;
764  bool msgcontentlength_known= false;
765  bool pdu_complete= false; // when to terminate inner loop
766
767  /* maybe use this to create a new pdu,
768    /// constructor
769    contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
770  */ 
771
772  // activate O_NON_BLOCK  for recv on socket conn_socket
773  // CAVEAT: this also implies non-blocking send()!
774  //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
775   
776  // set options for polling
777  const unsigned int number_poll_sockets= 1; 
778  struct pollfd poll_fd;
779  // have to set structure before poll call
780  poll_fd.fd = conn_socket;
781  poll_fd.events = POLLIN | POLLPRI; 
782  poll_fd.revents = 0;
783
784  int poll_status;
785  bool recv_error= false;
786
787  NetMsg* netmsg= 0;
788  NetMsg* remainbuf= 0;
789  size_t buffer_bytes_left= 0;
790  size_t trailingbytes= 0;
791  bool skiprecv= false;
792  // loop until we receive a terminate signal (read-only var for this thread)
793  // or get an error from socket read
794  while( receiver_thread_argp->sig_terminate == false )
795  {
796    // Read next PDU from socket or process trailing bytes in remainbuf
797    ret= 0;
798    msgcontentlength= 0;
799    msgcontentlength_known= false;
800    pdu_complete= false;
801    netmsg= 0;
802
803    // there are trailing bytes left from the last receive call
804    if (remainbuf != 0)
805    {
806      netmsg= remainbuf;
807      remainbuf= 0;
808      buffer_bytes_left= netmsg->get_size()-trailingbytes;
809      bytes_received= trailingbytes;
810      trailingbytes= 0;
811      skiprecv= true;
812    }
813    else // no trailing bytes, create a new buffer
814    if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
815    {
816      buffer_bytes_left= netmsg->get_size();
817      bytes_received= 0;
818      skiprecv= false;
819    }
820    else
821    { // buffer allocation failed
822      bytes_received= 0;
823      buffer_bytes_left= 0;
824      recv_error= true;
825    }
826   
827    // loops until PDU is complete
828    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
829    while (!pdu_complete && 
830           !recv_error && 
831           !receiver_thread_argp->sig_terminate)
832    {
833      if (!skiprecv)
834      {
835        // read from TCP socket or return after sleep_time
836        poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
837       
838        if (receiver_thread_argp->sig_terminate)
839        {
840          Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
841          // disallow sending
842          AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
843          if (myassoc->shutdown == false)
844          {
845            myassoc->shutdown= true;
846            if (shutdown(myassoc->socketfd,SHUT_WR))
847            {
848              if ( errno != ENOTCONN )
849                Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
850            }
851          }
852          // try to read do a last read from the TCP socket or return after sleep_time
853          if (poll_status == 0)
854          {
855            poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
856          }
857        }
858
859        if (poll_fd.revents & POLLERR) // Error condition
860        {
861          if (errno == 0 || errno == EINTR)
862          {
863            EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
864          }
865          else
866          {
867            ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
868            recv_error= true;
869          }
870        }
871       
872        if (poll_fd.revents & POLLHUP) // Hung up
873        {
874          Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
875          recv_error= true;
876        }
877       
878        if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
879        {
880          EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
881          recv_error= true;
882        }
883       
884        // check status (return value) of poll call
885        switch (poll_status)
886        {
887          case -1:
888            if (errno == 0 || errno == EINTR)
889            {
890              EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
891            }
892            else
893            {
894              ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
895              recv_error= true;
896            }
897           
898            continue; // next while iteration
899            break;
900           
901          case 0:
902#ifdef DEBUG_HARD
903            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
904#endif
905            continue; // next while iteration
906            break;
907           
908          default:
909#ifdef DEBUG_HARD
910            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
911#endif
912            break;
913        } // end switch
914
915
916        /// receive data from socket buffer (recv will not block)
917        ret = recv(conn_socket, 
918                   netmsg->get_buffer() + bytes_received, 
919                   buffer_bytes_left, 
920                   MSG_DONTWAIT);
921
922        if ( ret < 0 )
923        {
924          if (errno!=EAGAIN && errno!=EWOULDBLOCK)
925          {
926            ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
927            recv_error= true;
928            continue;
929          }
930          else
931          { // errno==EAGAIN || errno==EWOULDBLOCK
932            // just nothing to read from socket, continue w/ next poll
933            continue;
934          }
935        }
936        else
937        {
938          if (ret == 0)
939          {
940            // this means that EOF is reached,
941            // other side has closed connection
942            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
943            // disallow sending
944            AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
945            if (myassoc->shutdown == false)
946            {
947              myassoc->shutdown= true;
948              if (shutdown(myassoc->socketfd,SHUT_WR))
949              {
950                if ( errno != ENOTCONN )
951                  Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
952              }
953            }
954            // not a real error, but we must quit the receive loop
955            recv_error= true;
956          }
957          else
958          {
959
960            Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
961            // track number of received bytes
962            bytes_received+= ret;
963            buffer_bytes_left-= ret;
964          }
965        }
966      } // end if do not skip recv() statement     
967
968      if (buffer_bytes_left < 0) ///< buffer space exhausted now
969      {
970        recv_error= true;
971        Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
972      }
973
974      if (!msgcontentlength_known) ///< common header not parsed
975      {
976        // enough bytes read to parse common header?
977        if (bytes_received >= common_header_length)
978        {
979          // get message content length in number of bytes
980          if (getmsglength(*netmsg, msgcontentlength))
981            msgcontentlength_known= true;
982          else
983          {
984            ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
985
986            ostringstream hexdumpstr;
987            netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
988            DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
989
990            // reset all counters
991            msgcontentlength= 0;
992            msgcontentlength_known= false;
993            bytes_received= 0;
994            pdu_complete= false;
995            continue;
996          }
997        }
998      } // endif common header not parsed
999
1000      // check whether we have read the whole Protocol PDU
1001      DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
1002      if (msgcontentlength_known)
1003      {
1004        if (bytes_received-common_header_length >= msgcontentlength )
1005        {
1006          pdu_complete= true;
1007          // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1008          netmsg->truncate(common_header_length+msgcontentlength);
1009
1010          // trailing bytes are copied into new buffer
1011          if (bytes_received-common_header_length > msgcontentlength)
1012          {
1013            WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1014            remainbuf= new NetMsg(NetMsg::max_size);
1015            trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1016            bytes_received= common_header_length+msgcontentlength;
1017            memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1018          }
1019        }
1020        else
1021        { // not enough bytes in the buffer must continue to read from network
1022          skiprecv= false;
1023        }
1024      } // endif msgcontentlength_known
1025    } // end while (!pdu_complete && !recv_error && !signalled for termination)
1026    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1027
1028    // if other side closed the connection, we should still be able to deliver the remaining data
1029    if (ret == 0)
1030    {
1031      recv_error= false;
1032    }
1033
1034    // deliver only complete PDUs to signaling module
1035    if (!recv_error && pdu_complete)
1036    {
1037      // create TPMsg and send it to the signaling thread
1038      tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1039      if (tpmsg)
1040      {
1041        DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1042             << " to module " <<  message::get_qaddr_name(tpparam.dest));
1043      }
1044
1045      debug_pdu=false;
1046
1047      if (debug_pdu)
1048      {
1049        ostringstream hexdump;
1050        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1051        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1052      }
1053
1054      // send the message if it was successfully created
1055      // bool message::send_to(qaddr_t dest, bool exp = false);
1056      if (!tpmsg
1057          || (!tpmsg->get_peeraddress())
1058          || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest))) 
1059      {
1060        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1061        if (tpmsg) delete tpmsg;
1062      } // end if tpmsg not allocated or not addr or not sent
1063     
1064
1065    } // end if !recv_error
1066    else
1067    { // error during receive or PDU incomplete
1068      if (bytes_received>0)
1069      {
1070        Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ?  "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1071      }
1072
1073      if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1074      {
1075        ostringstream hexdumpstr;
1076        netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1077        Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str()); 
1078      }
1079      // leave the outer loop
1080      /**********************/
1081      break;
1082      /**********************/
1083    } // end else
1084
1085  } // end while (thread not signalled for termination)
1086
1087  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() 
1088      << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1089
1090  // shutdown socket
1091  if (shutdown(conn_socket, SHUT_RD))
1092  {
1093    if ( errno != ENOTCONN )
1094      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1095  }
1096
1097  // close socket
1098  close(conn_socket);
1099
1100  receiver_thread_argp->terminated= true;
1101
1102  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1103
1104#ifdef _DEBUG
1105  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1106#endif
1107  // notify master thread for invocation of cleanup procedure
1108  TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1109  // send message to main loop thread
1110  newmsg->send_to(tpparam.source);
1111
1112} 
1113
1114
1115/** this signals a terminate to a thread and wait for the thread to stop
1116 *  @note it is not safe to access any thread related data after this method returned,
1117 *        because the receiver thread will initiate a cleanup_receiver_thread() method
1118 *        which may erase all relevant thread data.
1119 */
1120void 
1121TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1122{
1123  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1124  // after this procedure peer_assoc may be invalid because it was erased
1125
1126  // start critical section
1127
1128  if (peer_assoc == 0)
1129    return;
1130
1131  pthread_t thread_id=  peer_assoc->thread_ID;
1132 
1133  // try to clean up receiver_thread_arg
1134  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1135  receiver_thread_arg_t* recv_thread_argp= 
1136    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1137  if (recv_thread_argp)
1138  {
1139    if (!recv_thread_argp->terminated)
1140    {
1141      // thread signaled termination, but is not?
1142      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1143
1144      // signal thread for its termination
1145      recv_thread_argp->sig_terminate= true;
1146      // wait for thread to join after termination
1147      pthread_join(thread_id, 0);
1148      // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1149      return;
1150    }
1151  }
1152  else
1153    Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1154
1155}
1156
1157
1158/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1159 *  usually called by the master_thread after the receiver_thread terminated
1160 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1161 *       is still valid
1162 */
1163void 
1164TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1165{
1166  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1167  // after this procedure peer_assoc may be invalid because it was erased
1168
1169  // start critical section
1170
1171  if (peer_assoc == 0)
1172    return;
1173
1174  pthread_t thread_id=  peer_assoc->thread_ID;
1175 
1176  // try to clean up receiver_thread_arg
1177  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1178  receiver_thread_arg_t* recv_thread_argp= 
1179    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1180  if (recv_thread_argp)
1181  {
1182    if (!recv_thread_argp->terminated)
1183    {
1184      // thread signaled termination, but is not?
1185      Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1186      return;
1187    }
1188    else
1189    { // if thread is already terminated
1190      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1191
1192      // delete it from receiver map
1193      recv_thread_argmap.erase(recv_thread_arg_iter);
1194
1195      // then delete receiver arg structure
1196      delete recv_thread_argp;
1197    }
1198  }
1199
1200  // delete entry from connection map
1201
1202  // cleanup sender thread
1203  // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1204  terminate_sender_thread(peer_assoc);
1205
1206  // delete the AssocData structure from the connection map
1207  // also frees allocated AssocData structure
1208  connmap.erase(peer_assoc);
1209
1210  // end critical section
1211
1212  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1213}
1214
1215
1216/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1217 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1218 *       is still valid, a lock is also required, because senderthread_queuemap is changed
1219 */
1220void 
1221TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1222{
1223  if (assoc == 0)
1224  {
1225    Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1226    return;
1227  }
1228
1229  sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1230
1231  if (it != senderthread_queuemap.end())
1232  { // we have a sender thread: send a stop message to it
1233    FastQueue* destqueue= it->second; 
1234    if (destqueue)
1235    {
1236      TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1237      if (internalmsg)
1238      {
1239        // send the internal message to the sender thread queue
1240        internalmsg->send(tpparam.source,destqueue);
1241      }
1242    }
1243    else
1244    {
1245      Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1246    }
1247    // erase entry from map
1248    senderthread_queuemap.erase(it);
1249  }
1250}
1251
1252/* terminate all active threads
1253 * note: locking should not be necessary here because this message is called as last method from
1254 * main_loop()
1255 */
1256void 
1257TPoverTCP::terminate_all_threads()
1258{
1259  AssocData* assoc= 0;
1260  receiver_thread_arg_t* terminate_argp;
1261
1262  for (recv_thread_argmap_t::iterator terminate_iterator=  recv_thread_argmap.begin();
1263       terminate_iterator !=  recv_thread_argmap.end();
1264       terminate_iterator++)
1265  {
1266    if ( (terminate_argp= terminate_iterator->second) != 0)
1267    {
1268      // we need a non const pointer to erase it later on
1269      assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1270      // check whether thread is still alive
1271      if (terminate_argp->terminated == false)
1272      {
1273        terminate_argp->sig_terminate= true;
1274        // then wait for its termination
1275        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1276            "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1277       
1278        pthread_join(terminate_iterator->first, 0);
1279       
1280        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first  << "> is terminated");
1281      }
1282      else
1283        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1284            "Receiver thread <" << terminate_iterator->first << "> already terminated");
1285       
1286      // cleanup all remaining argument structures of terminated threads
1287      delete terminate_argp;
1288
1289      // terminate any related sender thread that is still running
1290      terminate_sender_thread(assoc);
1291     
1292      connmap.erase(assoc);
1293      // delete assoc is not necessary, because connmap.erase() will do the job
1294    }
1295  } // end for
1296}
1297
1298
1299/**
1300 * sender thread starter:
1301 * just a static starter method to allow starting the
1302 * actual sender_thread() method.
1303 *
1304 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1305 */
1306void*
1307TPoverTCP::sender_thread_starter(void *argp)
1308{
1309  sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1310 
1311  //cout << "invoked sender_thread_Starter" << endl;
1312
1313  // invoke sender thread method
1314  if (sargp != 0 && sargp->instance != 0)
1315  {
1316    // call receiver_thread member function on object instance
1317    sargp->instance->sender_thread(sargp->sender_thread_queue);
1318
1319    //cout << "Before deletion of sarg" << endl;
1320
1321    // no longer needed
1322    delete sargp;
1323  }
1324  else
1325  {
1326    Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1327  }
1328  return 0;
1329}
1330
1331
1332
1333
1334/**
1335 * receiver thread starter:
1336 * just a static starter method to allow starting the
1337 * actual receiver_thread() method.
1338 *
1339 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1340 */
1341void*
1342TPoverTCP::receiver_thread_starter(void *argp)
1343{
1344  receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1345  // invoke receiver thread method
1346  if (rargp != 0 && rargp->instance != 0)
1347  {
1348    // call receiver_thread member function on object instance
1349    rargp->instance->receiver_thread(rargp->rtargp);
1350
1351    // no longer needed
1352    delete rargp;
1353  }
1354  else
1355  {
1356    Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1357  }
1358  return 0;
1359}
1360
1361
1362void
1363TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1364{
1365  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1366
1367  pthread_t senderthreadid;
1368  // create new thread; (arg == 0) is handled by thread, too
1369  int pthread_status= pthread_create(&senderthreadid, 
1370                                     NULL, // NULL: default attributes: thread is joinable and has a
1371                                     //       default, non-realtime scheduling policy
1372                                     TPoverTCP::sender_thread_starter,
1373                                     new sender_thread_start_arg_t(this,senderfqueue));
1374  if (pthread_status)
1375  {
1376    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1377   
1378    delete senderfqueue;
1379  }
1380}
1381
1382
1383void
1384TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1385{
1386  receiver_thread_arg_t* argp= 
1387    new(nothrow) receiver_thread_arg(peer_assoc);
1388 
1389  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1390
1391  // create new thread; (arg == 0) is handled by thread, too
1392  int pthread_status= pthread_create(&peer_assoc->thread_ID, 
1393                                     NULL, // NULL: default attributes: thread is joinable and has a
1394                                     //       default, non-realtime scheduling policy
1395                                     receiver_thread_starter,
1396                                     new(nothrow) receiver_thread_start_arg_t(this,argp));
1397  if (pthread_status)
1398  {
1399    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1400   
1401    delete argp;
1402  }
1403  else
1404  {
1405    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1406
1407    // remember pointer to thread arg structure
1408    // thread arg structure should be destroyed after thread termination only
1409    pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1410      recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1411    if (tmpinsiterator.second == false)
1412    {
1413      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1414    }
1415    unlock(); // uninstall_cleanup(1);
1416  } 
1417}
1418
1419
1420/**
1421 * master listener thread starter:
1422 * just a static starter method to allow starting the
1423 * actual master_listener_thread() method.
1424 *
1425 * @param argp - pointer to the current TPoverTCP object instance
1426 */
1427void*
1428TPoverTCP::master_listener_thread_starter(void *argp)
1429{
1430  // invoke listener thread method
1431  if (argp != 0)
1432  {
1433    (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1434  }
1435  return 0;
1436}
1437
1438
1439
1440/**
1441 * master listener thread: waits for incoming connections at the well-known tcp port
1442 * when a connection request is received this thread spawns a receiver_thread for
1443 * receiving packets from the peer at the new socket.
1444 */
1445void
1446TPoverTCP::master_listener_thread()
1447{
1448  // create a new address-structure for the listening masterthread
1449  struct sockaddr_in6 own_address_v6;
1450  own_address_v6.sin6_family = AF_INET6;
1451  own_address_v6.sin6_flowinfo= 0;
1452  own_address_v6.sin6_port = htons(tpparam.port); // use port number in param structure
1453  // accept incoming connections on all interfaces
1454  own_address_v6.sin6_addr = in6addr_any;
1455  own_address_v6.sin6_scope_id= 0;
1456
1457  // create a new address-structure for the listening masterthread
1458  struct sockaddr_in own_address_v4;
1459  own_address_v4.sin_family = AF_INET;
1460  own_address_v4.sin_port = htons(tpparam.port); // use port number in param structure
1461  // accept incoming connections on all interfaces
1462  own_address_v4.sin_addr.s_addr = INADDR_ANY;
1463 
1464  // create a listening socket
1465  int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1466  if (master_listener_socket!=-1) v4_mode = false;
1467  if (master_listener_socket == -1) {
1468          master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
1469          if (master_listener_socket!=-1) v4_mode = true;
1470  }
1471  if (master_listener_socket == -1)
1472  {
1473    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1474    return;
1475  }
1476 
1477  // Disable Nagle Algorithm, set (TCP_NODELAY)
1478  int nodelayflag= 1;
1479  int status= setsockopt(master_listener_socket,
1480                         IPPROTO_TCP,
1481                         TCP_NODELAY,
1482                         &nodelayflag,
1483                         sizeof(nodelayflag));
1484  if (status)
1485  {
1486    Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1487  }
1488 
1489  // Reuse ports, even if they are busy
1490  int socketreuseflag= 1;
1491  status= setsockopt(master_listener_socket,
1492                           SOL_SOCKET,
1493                           SO_REUSEADDR,
1494                           (const char *) &socketreuseflag,
1495                           sizeof(socketreuseflag));
1496  if (status)
1497  {
1498       Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1499  }
1500 
1501 
1502  // bind the newly created socket to a specific address
1503  int bind_status = bind(master_listener_socket, v4_mode ?
1504                         reinterpret_cast<struct sockaddr *>(&own_address_v4) :
1505                         reinterpret_cast<struct sockaddr *>(&own_address_v6),
1506                                         v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
1507  if (bind_status)
1508    { 
1509      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " 
1510          << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
1511                          inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
1512          << " port " << tpparam.port << " failed, error: " << strerror(errno));
1513      return;
1514    }
1515
1516    // listen at the socket,
1517    // queuesize for pending connections= max_listen_queue_size
1518    int listen_status = listen(master_listener_socket, max_listen_queue_size);
1519    if (listen_status)
1520    {
1521      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1522          << " failed, error: " << strerror(errno));
1523      return;
1524    }
1525    else
1526    {
1527      Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1528    }
1529
1530    // activate O_NON_BLOCK for accept (accept does not block)
1531    fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1532
1533    // create a pollfd struct for use in the mainloop
1534    struct pollfd poll_fd;
1535    poll_fd.fd = master_listener_socket;
1536    poll_fd.events = POLLIN | POLLPRI; 
1537    poll_fd.revents = 0;
1538    /*
1539      #define POLLIN    0x001   // There is data to read.
1540      #define POLLPRI   0x002   // There is urgent data to read. 
1541      #define POLLOUT   0x004   // Writing now will not block. 
1542    */
1543   
1544    bool terminate = false;
1545    // check for thread terminate condition using get_state()
1546    state_t currstate= get_state();
1547    int poll_status= 0;
1548    const unsigned int number_poll_sockets= 1; 
1549    struct sockaddr_in6 peer_address;
1550    socklen_t peer_address_len;
1551    int conn_socket;
1552
1553    // check whether this thread is signaled for termination
1554    while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1555    {
1556      // wait on number_poll_sockets (main drm socket)
1557      // for the events specified above for sleep_time (in ms)
1558      poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1559      if (poll_fd.revents & POLLERR) // Error condition
1560      {
1561        if (errno != EINTR) 
1562        {
1563          Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1564              "Poll caused error " << strerror(errno) << " - indicated by revents");
1565        }
1566        else
1567        {
1568          Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1569        }
1570
1571      }
1572      if (poll_fd.revents & POLLHUP) // Hung up
1573      {
1574        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1575        return;
1576      }
1577      if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1578      {
1579        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1580        return;
1581      }
1582     
1583      switch (poll_status)
1584      {
1585        case -1:
1586          if (errno != EINTR)
1587          {
1588            Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1589          }
1590          else
1591          {
1592            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1593          }
1594           
1595          break;
1596
1597        case 0:
1598#ifdef DEBUG_HARD
1599          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, 
1600              "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1601#endif
1602          currstate= get_state();
1603          continue;
1604          break;
1605
1606        default:
1607#ifdef DEBUG_HARD
1608          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1609#endif
1610          break;
1611      } // end switch
1612
1613      // after a successful accept call,
1614      // accept stores the address information of the connecting party
1615      // in peer_address and the size of its address in addrlen
1616      peer_address_len= sizeof(peer_address);
1617      conn_socket = accept (master_listener_socket,
1618                            reinterpret_cast<struct sockaddr *>(&peer_address),
1619                            &peer_address_len);
1620      if (conn_socket == -1)
1621      {
1622        if (errno != EWOULDBLOCK && errno != EAGAIN)
1623        {
1624          Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1625              << " failed, error: " << strerror(errno));
1626          return;
1627        }
1628      }
1629      else
1630      {
1631        // create a new assocdata-object for the new thread
1632        AssocData* peer_assoc = NULL;
1633        appladdress addr(peer_address, IPPROTO_TCP);
1634
1635        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str() 
1636            << " port #" << addr.get_port());
1637
1638        struct sockaddr_in6 own_address;
1639        if (v4_mode) {
1640                struct sockaddr_in own_address_v4;
1641                socklen_t own_address_len_v4 = sizeof(own_address_v4);
1642                getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
1643                v4_to_v6(&own_address_v4, &own_address);
1644        } else {
1645                socklen_t own_address_len= sizeof(own_address);
1646                getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1647        }
1648
1649        // AssocData will copy addr content into its own structure
1650        // allocated peer_assoc will be stored in connmap
1651        peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1652
1653        bool insert_success= false;
1654        if (peer_assoc)
1655        {
1656          // start critical section
1657          lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1658          insert_success= connmap.insert(peer_assoc);
1659          // end critical section
1660          unlock(); // uninstall_cleanup(1);
1661        }
1662       
1663       
1664        if (insert_success == false) // not inserted into connmap
1665        {
1666          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1667              << ", " << addr.get_ip_str() << ", port #" 
1668              << addr.get_port() << " into connection map, aborting connection...");
1669
1670          // abort connection, delete its AssocData
1671          close (conn_socket);
1672          if (peer_assoc) 
1673          { 
1674            delete peer_assoc;
1675            peer_assoc= 0;
1676          }
1677          return;
1678               
1679        } //end __else(connmap.insert());__
1680       
1681        // create a new thread for each new connection
1682        create_new_receiver_thread(peer_assoc);
1683      } // end __else (connsocket)__
1684     
1685      // get new thread state
1686      currstate= get_state();
1687
1688    } // end while(!terminate)
1689    return;
1690} // end listen_for_connections()   
1691
1692
1693TPoverTCP::~TPoverTCP()
1694{
1695  init= false;
1696
1697  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,  "Destructor called");
1698
1699  QueueManager::instance()->unregister_queue(tpparam.source);
1700}
1701
1702/** TPoverTCP Thread main loop.
1703 * This loop checks for internal messages of either
1704 * a send() call to start a new receiver thread, or,
1705 * of a receiver_thread() that signals its own termination
1706 * for proper cleanup of control structures.
1707 * It also handles the following internal TPoverTCPMsg types:
1708 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1709 * - TPoverTCPMsg::start - a particular receiver thread is started
1710 * @param nr number of current thread instance
1711 */
1712void 
1713TPoverTCP::main_loop(uint32 nr)
1714{
1715
1716  // get internal queue for messages from receiver_thread
1717  FastQueue* fq = get_fqueue();
1718  if (!fq) 
1719  {
1720    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1721    return;
1722  } // end if not fq
1723  // register queue for receiving internal messages from other modules
1724  QueueManager::instance()->register_queue(fq,tpparam.source);
1725
1726  // start master listener thread
1727  pthread_t master_listener_thread_ID;
1728  int pthread_status= pthread_create(&master_listener_thread_ID, 
1729                                     NULL, // NULL: default attributes: thread is joinable and has a
1730                                     //       default, non-realtime scheduling policy
1731                                     master_listener_thread_starter,
1732                                     this);
1733  if (pthread_status)
1734  {
1735    Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1736        "New master listener thread could not be created: " <<  strerror(pthread_status));
1737  }
1738  else
1739    Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1740
1741
1742  // define max latency for thread reaction on termination/stop signal
1743  timespec wait_interval= { 0, 250000000L }; // 250ms
1744  message* internal_thread_msg = NULL;
1745  state_t currstate= get_state();
1746
1747  // check whether this thread is signaled for termination
1748  while( currstate!=STATE_ABORT && currstate!=STATE_STOP ) 
1749  {
1750    // poll internal message queue (blocking)
1751    if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1752    {
1753      TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1754      if (internalmsg)
1755      {
1756        if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1757        {
1758          // a receiver thread terminated and signaled for cleanup by master thread
1759          AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1760          Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1761          lock();
1762          cleanup_receiver_thread( assocd );
1763          unlock();
1764        }
1765        else
1766        if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1767        {
1768          // start a new receiver thread
1769          create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1770        }
1771        else
1772          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1773         
1774        delete internalmsg;
1775      }
1776      else
1777      {
1778        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1779            << internal_thread_msg->get_source());
1780      }
1781    } // endif
1782
1783    // get thread state
1784    currstate= get_state();
1785  } // end while
1786
1787  if (currstate==STATE_STOP)
1788  {
1789    // start abort actions
1790    Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1791  } // end if stopped
1792
1793  // do not accept any more messages
1794  fq->shutdown();
1795  // terminate all receiver and sender threads that are still active
1796  terminate_all_threads();
1797}
1798
1799} // end namespace protlib
1800///@}
Note: See TracBrowser for help on using the repository browser.