An Overlay-based
Virtual Network Substrate
SpoVNet

source: trash/old-modules/transport/protlib/tp_over_tcp.cpp @ 5641

Last change on this file since 5641 was 5641, checked in by Christoph Mayer, 14 years ago
File size: 53.4 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29 
30extern "C"
31{
32  //#define _SOCKADDR_LEN    /* use BSD 4.4 sockets */
33#include <unistd.h>      /* gethostname */
34#include <sys/types.h>   /* network socket interface */
35#include <netinet/in.h>  /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h>   /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63namespace protlib {
64
65using namespace log;
66
67/** @defgroup tptcp TP over TCP
68 * @ingroup network
69 * @{
70 */
71
72char in6_addrstr[INET6_ADDRSTRLEN+1];
73
74
75/******* class TPoverTCP *******/
76
77
78/** get_connection_to() checks for already existing connections.
79 *  If a connection exists, it returns "AssocData"
80 *  and saves it in "connmap" for further use
81 *  If no connection exists, a new connection to "addr"
82 *  is created.
83 */
84AssocData* 
85TPoverTCP::get_connection_to(const appladdress& addr)
86{
87  // get timeout
88  struct timespec ts;
89  get_time_of_day(ts);
90  ts.tv_nsec+= tpparam.sleep_time * 1000000L;
91  if (ts.tv_nsec>=1000000000L)
92  {
93    ts.tv_sec += ts.tv_nsec / 1000000000L;
94    ts.tv_nsec= ts.tv_nsec % 1000000000L;
95  }
96
97  // create a new AssocData pointer, initialize it to NULL
98  AssocData* assoc= NULL;
99  int new_socket;
100  // loop until timeout is exceeded: TODO: check applicability of loop
101  do 
102  {
103    // check for existing connections to addr
104    // start critical section
105    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
106    assoc= connmap.lookup(addr);
107    // end critical section
108    unlock(); // uninstall_cleanup(1);
109    if (assoc) 
110    {
111      // If not shut down then use it, else abort, wait and check again.
112      if (!assoc->shutdown) 
113      {
114        return assoc;
115      }
116      else
117      {
118        // TODO: connection is already in shutdown mode. What now?
119        ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
120
121        return 0;
122      } 
123    } //end __if (assoc)__
124    else 
125    {
126      Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to " 
127          << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
128    }
129
130    // no connection found, create a new one
131    new_socket = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
132    if (new_socket == -1)
133    {
134      ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
135
136      return 0;
137    }
138
139    // Disable Nagle Algorithm, set (TCP_NODELAY)
140    int nodelayflag= 1;
141    int status= setsockopt(new_socket,
142                           IPPROTO_TCP,
143                           TCP_NODELAY,
144                           &nodelayflag,
145                           sizeof(nodelayflag));
146    if (status)
147    {
148        ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
149    }
150   
151    // Reuse ports, even if they are busy
152    int socketreuseflag= 1;
153    status= setsockopt(new_socket,
154                           SOL_SOCKET,
155                           SO_REUSEADDR,
156                           (const char *) &socketreuseflag,
157                           sizeof(socketreuseflag));
158    if (status)
159    {
160        ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
161    }
162
163    struct sockaddr_in6 dest_address;
164    dest_address.sin6_flowinfo= 0;
165    dest_address.sin6_scope_id= 0;
166    addr.get_sockaddr(dest_address);
167     
168    // connect the socket to the destination address
169    int connect_status = connect(new_socket,
170                                 reinterpret_cast<const struct sockaddr*>(&dest_address),
171                                 sizeof(dest_address));
172
173    // connects to the listening_port of the peer's masterthread   
174    if (connect_status != 0)
175    {
176      ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port() 
177          << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
178     
179      return 0; // error: couldn't connect
180    }
181
182   
183    struct sockaddr_in6 own_address;
184    socklen_t own_address_len= sizeof(own_address);
185    getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
186
187   
188
189    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port() 
190        << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN) 
191        << " port #" << ntohs(own_address.sin6_port));
192   
193   
194
195   
196    // create new AssocData record (will copy addr)
197    assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
198
199    // if assoc could be successfully created, insert it into ConnectionMap
200    if (assoc) 
201    {
202      bool insert_success= false;
203      // start critical section
204      lock(); // install_cleanup_thread_lock(TPoverTCP, this);
205      // insert AssocData into connection map
206      insert_success= connmap.insert(assoc);
207      // end critical section
208      unlock(); // uninstall_cleanup(1);
209
210      if (insert_success == true) 
211      {
212#ifdef _DEBUG
213        Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
214            << " via socket " << new_socket);
215
216               
217#endif
218
219        // notify master thread to start a receiver thread (i.e. send selfmessage)
220        TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
221        if (newmsg)
222        {
223          newmsg->send_to(tpparam.source);
224          return assoc;
225        }
226        else
227          ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
228      } 
229      else 
230      {
231        // delete data and abort
232        ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
233            <<", port #" << addr.get_port() << " into connection map, aborting connection");
234               
235        // abort connection, delete its AssocData
236        close (new_socket);
237        if (assoc) 
238        { 
239          delete assoc; 
240          assoc= 0;
241        }
242        return assoc;
243      } // end else connmap.insert                     
244     
245    } // end "if (assoc)"
246  } 
247  while (wait_cond(ts)!=ETIMEDOUT);
248
249  return assoc;
250} //end get_connection_to
251
252
253/** terminates a signaling association/connection
254 *  - if no connection exists, generate a warning
255 *  - otherwise: generate internal msg to related receiver thread
256 */
257void
258TPoverTCP::terminate(const address& in_addr)
259{
260#ifndef _NO_LOGGING
261 const char *const thisproc="terminate()   - ";
262#endif
263
264 appladdress* addr = NULL;
265 addr = dynamic_cast<appladdress*>(in_addr.copy());
266
267 if (!addr) return;
268
269  // Create a new AssocData-pointer
270  AssocData* assoc = NULL;
271
272  // check for existing connections to addr
273
274  // start critical section:
275  // please note if receiver_thread terminated already, the assoc data is not
276  // available anymore therefore we need a lock around cleanup_receiver_thread()
277
278  lock(); // install_cleanup_thread_lock(TPoverTCP, this);
279  assoc= connmap.lookup(*addr);
280  if (assoc) 
281  {
282    EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
283    // If not shut down then use it, else abort, wait and check again.
284    if (!assoc->shutdown) 
285    {
286      if (assoc->socketfd)
287      {
288        // disallow sending
289        if (shutdown(assoc->socketfd,SHUT_WR))
290        {
291          ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
292        }
293        else
294        {
295          EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );       
296        }
297      }
298      assoc->shutdown= true;
299    }
300    else
301      EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
302     
303  }
304  else
305    WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
306
307  stop_receiver_thread(assoc);
308
309  // end critical section
310  unlock(); // uninstall_cleanup(1);
311
312  if (addr) delete addr;
313}
314
315
316/** generates and internal TPoverTCP message to send a NetMsg to the network
317 *  - it is necessary to let a thread do this, because the caller
318 *     may get blocked if the connect() or send() call hangs for a while
319 *  - the sending thread will call TPoverTCP::tcpsend()
320 *  - if no connection exists, creates a new one (unless use_existing_connection is true)
321 *  @note the netmsg is deleted by the send() method when it is not used anymore
322 */
323void
324TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
325{
326  if (netmsg == NULL) {
327    ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
328    return;
329  }
330
331  appladdress* addr = NULL;
332  addr= dynamic_cast<appladdress*>(in_addr.copy());
333   
334  if (!addr)
335  {
336    ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
337    return;
338  } 
339
340  // lock due to sendermap access
341  lock();
342   
343  // check for existing sender thread
344  sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
345   
346  FastQueue* destqueue= 0;
347
348  if (it == senderthread_queuemap.end())
349  { // no sender thread found so far
350
351    // if we already have an existing connection it is save to create a sender thread
352    // since get_connection_to() will not be invoked, so an existing connection will
353    // be used
354    const AssocData* assoc = connmap.lookup(*addr);
355
356    //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
357
358    if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
359    { // it is allowed to create a new connection for this thread
360      // create a new queue for sender thread
361      FastQueue* sender_thread_queue= new FastQueue;
362      create_new_sender_thread(sender_thread_queue);
363      // remember queue for later use
364     
365      //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
366      senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
367     
368      destqueue= sender_thread_queue;
369    }
370  }
371  else
372  { // we have a sender thread, use stored queue from map
373    destqueue= it->second; 
374  }
375   
376  unlock();
377   
378  // send a send_data message to it (if we have a destination queue)
379  if (destqueue)
380  {
381    // both parameters will be freed after message was sent!
382   
383    TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
384    if (internalmsg)
385    {
386      // send the internal message to the sender thread queue
387      internalmsg->send(tpparam.source,destqueue);
388    }
389  }
390  else
391  {
392    if (!use_existing_connection)
393      WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
394    else
395    {
396      DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
397
398    }
399    // cannot send data, so we must drop it
400    delete netmsg;
401  }
402 
403  if (addr) delete addr;
404}
405
406/** sends a NetMsg to the network.
407 *
408 * @param netmsg message to send
409 * @param addr   transport endpoint address
410 *
411 * @note   if no connection exists, creates a new one
412 * @note   both parameters are deleted after the message was sent
413 */
414void
415TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
416{                       
417#ifndef _NO_LOGGING
418  const char *const thisproc="sender   - ";
419#endif
420
421  // set result initially to success, set it to failure
422  // in places where these failures occur
423  int result = TCP_SUCCESS;
424  int saved_errno= 0;
425  int ret= 0;
426
427  // Create a new AssocData-pointer
428  AssocData* assoc = NULL;
429 
430  // tp.cpp checks for initialisation of tp and correctness of
431  // msgsize, protocol and ip,
432  // throws an error if something is not right
433  if (addr) {
434    addr->convert_to_ipv6();
435    check_send_args(*netmsg,*addr);
436  } 
437  else 
438  {
439    ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
440    result= TCP_SEND_FAILURE;
441
442    delete netmsg;
443    delete addr;
444
445    throw TPErrorInternal();
446  }
447
448
449  // check for existing connections,
450  // if a connection exists, return its AssocData
451  // and save it in assoc for further use
452  // if no connection exists, create a new one (in get_connection_to()).
453  // Connection is inserted into connection map that must be done
454  // with exclusive access
455  assoc= get_connection_to(*addr);
456
457  if (assoc==NULL || assoc->socketfd<=0)
458  {
459    ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
460
461    delete netmsg;
462    delete addr;
463    return;
464  }
465
466  if (assoc->shutdown)
467  {
468    Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
469    delete netmsg;
470    delete addr;
471
472    throw TPErrorSendFailed();
473  }
474
475  uint32 msgsize= netmsg->get_size();
476#ifdef DEBUG_HARD
477  cerr << thisproc << "message size=" << netmsg->get_size() << endl;
478#endif
479
480  const uint32 retry_send_max = 3;
481  uint32 retry_count = 0;
482  // send all the data contained in netmsg to the socket
483  // which belongs to the address "addr"
484  for (uint32 bytes_sent= 0;
485       bytes_sent < msgsize;
486       bytes_sent+= ret)
487  {
488
489#ifdef _DEBUG_HARD
490    for (uint32 i=0;i<msgsize;i++)
491    {
492      cout << "send_buf: " << i << " : "; 
493      if ( isalnum(*(netmsg->get_buffer()+i)) )
494        cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
495      else
496        cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
497      cout << endl;
498    }
499
500    cout << endl;
501    cout << "bytes_sent: " << bytes_sent << endl;
502    cout << "Message size: " <<  msgsize << endl;
503    cout << "Send-Socket: " << assoc->socketfd << endl;
504    cout << "pointer-Offset. " << netmsg->get_pos() << endl;
505    cout << "vor send " << endl;
506#endif
507
508    retry_count= 0;
509    do 
510    {
511      // socket send
512      ret= ::send(assoc->socketfd, 
513                  netmsg->get_buffer() + bytes_sent, 
514                  msgsize - bytes_sent, 
515                  MSG_NOSIGNAL);
516     
517      // send_buf + bytes_sent
518
519      // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
520      // retry sending
521      if (ret < 0)
522      { // internal error during send occured
523        saved_errno= errno;
524        switch(saved_errno)
525        {
526          case EAGAIN:  // same as EWOULDBLOCK
527          case EINTR:   // man page says: A signal occurred before any data was transmitted.
528          case ENOBUFS: //  The output queue for a network interface was full.
529            retry_count++;
530            ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
531                   << " - retry sending, retry #" << retry_count);
532            // pace down a little bit, sleep for a while
533            sleep(1);
534            break;
535
536            // everything else should not lead to repetition
537          default:
538            retry_count= retry_send_max;
539            break;
540        } // end switch
541      } // endif
542      else // leave while
543        break;
544    } 
545    while(retry_count < retry_send_max);
546
547    if (ret < 0) 
548    { // unrecoverable error occured
549     
550      result= TCP_SEND_FAILURE;
551      break;
552    } // end if (ret < 0)
553    else
554    {
555      if (debug_pdu)
556      {
557        ostringstream hexdump;
558        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
559        DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
560      }
561    }
562
563    // continue with send next data block in next for() iteration
564  } // end for
565
566  // *** note: netmsg is deleted here ***
567  delete netmsg;
568 
569  // Throwing an exception within a critical section does not
570  // unlock the mutex.
571
572  if (result != TCP_SUCCESS)
573  {
574    ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
575    delete addr;
576
577    throw TPErrorSendFailed(saved_errno);
578   
579  }
580  else
581    EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd  << " to " << *addr);
582
583  if (!assoc) {
584    // no connection
585   
586    ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str() 
587        << ", port #" << addr->get_port());
588
589    delete addr;
590
591    throw TPErrorUnreachable(); // should be no assoc found
592  } // end "if (!assoc)"
593 
594  // *** delete address ***
595  delete addr;
596}
597 
598/* this thread waits for an internal message that either:
599 * - requests transmission of a packet
600 * - requests to stop this thread
601 * @param argp points to the thread queue for internal messages
602 */
603void 
604TPoverTCP::sender_thread(void *argp)
605{
606#ifndef _NO_LOGGING
607  const char *const methodname="senderthread - ";
608#endif
609
610  message* internal_thread_msg = NULL;
611
612  EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
613
614  FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
615  if (!fq)
616  {
617    ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
618    return;
619  }
620
621  bool terminate= false;
622  TPoverTCPMsg* internalmsg= 0;
623  while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
624  {
625    internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
626   
627    if (internalmsg == 0)
628    {
629      ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());     
630    }
631    else
632    if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
633    {
634      // create a connection if none exists and send the netmsg
635      if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
636      {
637        try 
638        {
639          tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
640        } // end try
641        catch(TPErrorSendFailed& err)
642        {
643          ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what() 
644                 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
645        } // end catch
646        catch(TPError& err)
647        {
648          ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
649        } // end catch
650        catch(...)
651        {
652          ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
653        }
654      }
655      else
656      {
657        ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
658      } 
659    }
660    else
661    if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
662    {
663      terminate= true;
664    } 
665  } // end while
666 
667  EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
668}
669
670
671/** receiver thread listens at a TCP socket for incoming PDUs
672 *  and passes complete PDUs to the coordinator. Incomplete
673 *  PDUs due to aborted connections or buffer overflows are discarded.
674 *  @param argp - assoc data and flags sig_terminate and terminated
675 * 
676 *  @note this is a static member function, so you cannot use class variables
677 */
678void 
679TPoverTCP::receiver_thread(void *argp)
680{
681#ifndef _NO_LOGGING
682  const char *const methodname="receiver - ";
683#endif
684
685  receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
686  const appladdress* peer_addr = NULL;
687  const appladdress* own_addr = NULL;
688  uint32 bytes_received = 0;
689  TPMsg* tpmsg= NULL;
690 
691  // argument parsing - start
692  if (receiver_thread_argp == 0)
693  {
694    ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
695
696    return;
697  }
698  else
699  {
700    // change status to running, i.e., not terminated
701    receiver_thread_argp->terminated= false;
702
703#ifdef _DEBUG
704    DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
705#endif
706  }
707
708  int conn_socket= 0;
709  if (receiver_thread_argp->peer_assoc)
710  {
711    // get socket descriptor from arg
712    conn_socket = receiver_thread_argp->peer_assoc->socketfd;
713    // get pointer to peer address of socket (source/sender address of peer) from arg
714    peer_addr= &receiver_thread_argp->peer_assoc->peer;
715    own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
716  }
717  else
718  {
719    ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
720   
721    return;
722  }
723
724  if (peer_addr == 0)
725  {
726    ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
727   
728    return;
729  }
730  // argument parsing - end
731#ifdef _DEBUG
732  Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
733      "Preparing to wait for data at socket " 
734      << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
735#endif
736
737  int ret= 0;
738  uint32 msgcontentlength= 0;
739  bool msgcontentlength_known= false;
740  bool pdu_complete= false; // when to terminate inner loop
741
742  /* maybe use this to create a new pdu,
743    /// constructor
744    contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
745  */ 
746
747  // activate O_NON_BLOCK  for recv on socket conn_socket
748  // CAVEAT: this also implies non-blocking send()!
749  //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
750   
751  // set options for polling
752  const unsigned int number_poll_sockets= 1; 
753  struct pollfd poll_fd;
754  // have to set structure before poll call
755  poll_fd.fd = conn_socket;
756  poll_fd.events = POLLIN | POLLPRI; 
757  poll_fd.revents = 0;
758
759  int poll_status;
760  bool recv_error= false;
761
762  NetMsg* netmsg= 0;
763  NetMsg* remainbuf= 0;
764  size_t buffer_bytes_left= 0;
765  size_t trailingbytes= 0;
766  bool skiprecv= false;
767  // loop until we receive a terminate signal (read-only var for this thread)
768  // or get an error from socket read
769  while( receiver_thread_argp->sig_terminate == false )
770  {
771    // Read next PDU from socket or process trailing bytes in remainbuf
772    ret= 0;
773    msgcontentlength= 0;
774    msgcontentlength_known= false;
775    pdu_complete= false;
776    netmsg= 0;
777
778    // there are trailing bytes left from the last receive call
779    if (remainbuf != 0)
780    {
781      netmsg= remainbuf;
782      remainbuf= 0;
783      buffer_bytes_left= netmsg->get_size()-trailingbytes;
784      bytes_received= trailingbytes;
785      trailingbytes= 0;
786      skiprecv= true;
787    }
788    else // no trailing bytes, create a new buffer
789    if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
790    {
791      buffer_bytes_left= netmsg->get_size();
792      bytes_received= 0;
793      skiprecv= false;
794    }
795    else
796    { // buffer allocation failed
797      bytes_received= 0;
798      buffer_bytes_left= 0;
799      recv_error= true;
800    }
801   
802    // loops until PDU is complete
803    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
804    while (!pdu_complete && 
805           !recv_error && 
806           !receiver_thread_argp->sig_terminate)
807    {
808      if (!skiprecv)
809      {
810        // read from TCP socket or return after sleep_time
811        poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
812       
813        if (receiver_thread_argp->sig_terminate)
814        {
815          Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
816          // disallow sending
817          AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
818          if (myassoc->shutdown == false)
819          {
820            myassoc->shutdown= true;
821            if (shutdown(myassoc->socketfd,SHUT_WR))
822            {
823              if ( errno != ENOTCONN )
824                Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
825            }
826          }
827          // try to read do a last read from the TCP socket or return after sleep_time
828          if (poll_status == 0)
829          {
830            poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
831          }
832        }
833
834        if (poll_fd.revents & POLLERR) // Error condition
835        {
836          if (errno == 0 || errno == EINTR)
837          {
838            EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
839          }
840          else
841          {
842            ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
843            recv_error= true;
844          }
845        }
846       
847        if (poll_fd.revents & POLLHUP) // Hung up
848        {
849          Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
850          recv_error= true;
851        }
852       
853        if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
854        {
855          EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
856          recv_error= true;
857        }
858       
859        // check status (return value) of poll call
860        switch (poll_status)
861        {
862          case -1:
863            if (errno == 0 || errno == EINTR)
864            {
865              EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
866            }
867            else
868            {
869              ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
870              recv_error= true;
871            }
872           
873            continue; // next while iteration
874            break;
875           
876          case 0:
877#ifdef DEBUG_HARD
878            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
879#endif
880            continue; // next while iteration
881            break;
882           
883          default:
884#ifdef DEBUG_HARD
885            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
886#endif
887            break;
888        } // end switch
889
890
891        /// receive data from socket buffer (recv will not block)
892        ret = recv(conn_socket, 
893                   netmsg->get_buffer() + bytes_received, 
894                   buffer_bytes_left, 
895                   MSG_DONTWAIT);
896
897        if ( ret < 0 )
898        {
899          if (errno!=EAGAIN && errno!=EWOULDBLOCK)
900          {
901            ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
902            recv_error= true;
903            continue;
904          }
905          else
906          { // errno==EAGAIN || errno==EWOULDBLOCK
907            // just nothing to read from socket, continue w/ next poll
908            continue;
909          }
910        }
911        else
912        {
913          if (ret == 0)
914          {
915            // this means that EOF is reached,
916            // other side has closed connection
917            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
918            // disallow sending
919            AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
920            if (myassoc->shutdown == false)
921            {
922              myassoc->shutdown= true;
923              if (shutdown(myassoc->socketfd,SHUT_WR))
924              {
925                if ( errno != ENOTCONN )
926                  Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
927              }
928            }
929            // not a real error, but we must quit the receive loop
930            recv_error= true;
931          }
932          else
933          {
934
935            Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
936            // track number of received bytes
937            bytes_received+= ret;
938            buffer_bytes_left-= ret;
939          }
940        }
941      } // end if do not skip recv() statement     
942
943      if (buffer_bytes_left < 0) ///< buffer space exhausted now
944      {
945        recv_error= true;
946        Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
947      }
948
949      if (!msgcontentlength_known) ///< common header not parsed
950      {
951        // enough bytes read to parse common header?
952        if (bytes_received >= common_header_length)
953        {
954          // get message content length in number of bytes
955          if (getmsglength(*netmsg, msgcontentlength))
956            msgcontentlength_known= true;
957          else
958          {
959            ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
960
961            ostringstream hexdumpstr;
962            netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
963            DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
964
965            // reset all counters
966            msgcontentlength= 0;
967            msgcontentlength_known= false;
968            bytes_received= 0;
969            pdu_complete= false;
970            continue;
971          }
972        }
973      } // endif common header not parsed
974
975      // check whether we have read the whole Protocol PDU
976      DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
977      if (msgcontentlength_known)
978      {
979        if (bytes_received-common_header_length >= msgcontentlength )
980        {
981          pdu_complete= true;
982          // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
983          netmsg->truncate(common_header_length+msgcontentlength);
984
985          // trailing bytes are copied into new buffer
986          if (bytes_received-common_header_length > msgcontentlength)
987          {
988            WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
989            remainbuf= new NetMsg(NetMsg::max_size);
990            trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
991            bytes_received= common_header_length+msgcontentlength;
992            memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
993          }
994        }
995        else
996        { // not enough bytes in the buffer must continue to read from network
997          skiprecv= false;
998        }
999      } // endif msgcontentlength_known
1000    } // end while (!pdu_complete && !recv_error && !signalled for termination)
1001    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1002
1003    // if other side closed the connection, we should still be able to deliver the remaining data
1004    if (ret == 0)
1005    {
1006      recv_error= false;
1007    }
1008
1009    // deliver only complete PDUs to signaling module
1010    if (!recv_error && pdu_complete)
1011    {
1012      // create TPMsg and send it to the signaling thread
1013      tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1014      if (tpmsg)
1015      {
1016        DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1017             << " to module " <<  message::get_qaddr_name(tpparam.dest));
1018      }
1019
1020      debug_pdu=false;
1021
1022      if (debug_pdu)
1023      {
1024        ostringstream hexdump;
1025        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1026        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1027      }
1028
1029      // send the message if it was successfully created
1030      // bool message::send_to(qaddr_t dest, bool exp = false);
1031      if (!tpmsg
1032          || (!tpmsg->get_peeraddress())
1033          || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest))) 
1034      {
1035        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1036        if (tpmsg) delete tpmsg;
1037      } // end if tpmsg not allocated or not addr or not sent
1038     
1039
1040    } // end if !recv_error
1041    else
1042    { // error during receive or PDU incomplete
1043      if (bytes_received>0)
1044      {
1045        Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ?  "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1046      }
1047
1048      if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1049      {
1050        ostringstream hexdumpstr;
1051        netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1052        Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str()); 
1053      }
1054      // leave the outer loop
1055      /**********************/
1056      break;
1057      /**********************/
1058    } // end else
1059
1060  } // end while (thread not signalled for termination)
1061
1062  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() 
1063      << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1064
1065  // shutdown socket
1066  if (shutdown(conn_socket, SHUT_RD))
1067  {
1068    if ( errno != ENOTCONN )
1069      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1070  }
1071
1072  // close socket
1073  close(conn_socket);
1074
1075  receiver_thread_argp->terminated= true;
1076
1077  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1078
1079#ifdef _DEBUG
1080  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1081#endif
1082  // notify master thread for invocation of cleanup procedure
1083  TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1084  // send message to main loop thread
1085  newmsg->send_to(tpparam.source);
1086
1087} 
1088
1089
1090/** this signals a terminate to a thread and wait for the thread to stop
1091 *  @note it is not safe to access any thread related data after this method returned,
1092 *        because the receiver thread will initiate a cleanup_receiver_thread() method
1093 *        which may erase all relevant thread data.
1094 */
1095void 
1096TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1097{
1098  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1099  // after this procedure peer_assoc may be invalid because it was erased
1100
1101  // start critical section
1102
1103  if (peer_assoc == 0)
1104    return;
1105
1106  pthread_t thread_id=  peer_assoc->thread_ID;
1107 
1108  // try to clean up receiver_thread_arg
1109  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1110  receiver_thread_arg_t* recv_thread_argp= 
1111    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1112  if (recv_thread_argp)
1113  {
1114    if (!recv_thread_argp->terminated)
1115    {
1116      // thread signaled termination, but is not?
1117      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1118
1119      // signal thread for its termination
1120      recv_thread_argp->sig_terminate= true;
1121      // wait for thread to join after termination
1122      pthread_join(thread_id, 0);
1123      // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1124      return;
1125    }
1126  }
1127  else
1128    Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1129
1130}
1131
1132
1133/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1134 *  usually called by the master_thread after the receiver_thread terminated
1135 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1136 *       is still valid
1137 */
1138void 
1139TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1140{
1141  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1142  // after this procedure peer_assoc may be invalid because it was erased
1143
1144  // start critical section
1145
1146  if (peer_assoc == 0)
1147    return;
1148
1149  pthread_t thread_id=  peer_assoc->thread_ID;
1150 
1151  // try to clean up receiver_thread_arg
1152  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1153  receiver_thread_arg_t* recv_thread_argp= 
1154    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1155  if (recv_thread_argp)
1156  {
1157    if (!recv_thread_argp->terminated)
1158    {
1159      // thread signaled termination, but is not?
1160      Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1161      return;
1162    }
1163    else
1164    { // if thread is already terminated
1165      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1166
1167      // delete it from receiver map
1168      recv_thread_argmap.erase(recv_thread_arg_iter);
1169
1170      // then delete receiver arg structure
1171      delete recv_thread_argp;
1172    }
1173  }
1174
1175  // delete entry from connection map
1176
1177  // cleanup sender thread
1178  // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1179  terminate_sender_thread(peer_assoc);
1180
1181  // delete the AssocData structure from the connection map
1182  // also frees allocated AssocData structure
1183  connmap.erase(peer_assoc);
1184
1185  // end critical section
1186
1187  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1188}
1189
1190
1191/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1192 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1193 *       is still valid, a lock is also required, because senderthread_queuemap is changed
1194 */
1195void 
1196TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1197{
1198  if (assoc == 0)
1199  {
1200    Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1201    return;
1202  }
1203
1204  sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1205
1206  if (it != senderthread_queuemap.end())
1207  { // we have a sender thread: send a stop message to it
1208    FastQueue* destqueue= it->second; 
1209    if (destqueue)
1210    {
1211      TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1212      if (internalmsg)
1213      {
1214        // send the internal message to the sender thread queue
1215        internalmsg->send(tpparam.source,destqueue);
1216      }
1217    }
1218    else
1219    {
1220      Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1221    }
1222    // erase entry from map
1223    senderthread_queuemap.erase(it);
1224  }
1225}
1226
1227/* terminate all active threads
1228 * note: locking should not be necessary here because this message is called as last method from
1229 * main_loop()
1230 */
1231void 
1232TPoverTCP::terminate_all_threads()
1233{
1234  AssocData* assoc= 0;
1235  receiver_thread_arg_t* terminate_argp;
1236
1237  for (recv_thread_argmap_t::iterator terminate_iterator=  recv_thread_argmap.begin();
1238       terminate_iterator !=  recv_thread_argmap.end();
1239       terminate_iterator++)
1240  {
1241    if ( (terminate_argp= terminate_iterator->second) != 0)
1242    {
1243      // we need a non const pointer to erase it later on
1244      assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1245      // check whether thread is still alive
1246      if (terminate_argp->terminated == false)
1247      {
1248        terminate_argp->sig_terminate= true;
1249        // then wait for its termination
1250        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1251            "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1252       
1253        pthread_join(terminate_iterator->first, 0);
1254       
1255        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first  << "> is terminated");
1256      }
1257      else
1258        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1259            "Receiver thread <" << terminate_iterator->first << "> already terminated");
1260       
1261      // cleanup all remaining argument structures of terminated threads
1262      delete terminate_argp;
1263
1264      // terminate any related sender thread that is still running
1265      terminate_sender_thread(assoc);
1266     
1267      connmap.erase(assoc);
1268      // delete assoc is not necessary, because connmap.erase() will do the job
1269    }
1270  } // end for
1271}
1272
1273
1274/**
1275 * sender thread starter:
1276 * just a static starter method to allow starting the
1277 * actual sender_thread() method.
1278 *
1279 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1280 */
1281void*
1282TPoverTCP::sender_thread_starter(void *argp)
1283{
1284  sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1285 
1286  //cout << "invoked sender_thread_Starter" << endl;
1287
1288  // invoke sender thread method
1289  if (sargp != 0 && sargp->instance != 0)
1290  {
1291    // call receiver_thread member function on object instance
1292    sargp->instance->sender_thread(sargp->sender_thread_queue);
1293
1294    //cout << "Before deletion of sarg" << endl;
1295
1296    // no longer needed
1297    delete sargp;
1298  }
1299  else
1300  {
1301    Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1302  }
1303  return 0;
1304}
1305
1306
1307
1308
1309/**
1310 * receiver thread starter:
1311 * just a static starter method to allow starting the
1312 * actual receiver_thread() method.
1313 *
1314 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1315 */
1316void*
1317TPoverTCP::receiver_thread_starter(void *argp)
1318{
1319  receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1320  // invoke receiver thread method
1321  if (rargp != 0 && rargp->instance != 0)
1322  {
1323    // call receiver_thread member function on object instance
1324    rargp->instance->receiver_thread(rargp->rtargp);
1325
1326    // no longer needed
1327    delete rargp;
1328  }
1329  else
1330  {
1331    Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1332  }
1333  return 0;
1334}
1335
1336
1337void
1338TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1339{
1340  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1341
1342  pthread_t senderthreadid;
1343  // create new thread; (arg == 0) is handled by thread, too
1344  int pthread_status= pthread_create(&senderthreadid, 
1345                                     NULL, // NULL: default attributes: thread is joinable and has a
1346                                     //       default, non-realtime scheduling policy
1347                                     TPoverTCP::sender_thread_starter,
1348                                     new sender_thread_start_arg_t(this,senderfqueue));
1349  if (pthread_status)
1350  {
1351    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1352   
1353    delete senderfqueue;
1354  }
1355}
1356
1357
1358void
1359TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1360{
1361  receiver_thread_arg_t* argp= 
1362    new(nothrow) receiver_thread_arg(peer_assoc);
1363 
1364  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1365
1366  // create new thread; (arg == 0) is handled by thread, too
1367  int pthread_status= pthread_create(&peer_assoc->thread_ID, 
1368                                     NULL, // NULL: default attributes: thread is joinable and has a
1369                                     //       default, non-realtime scheduling policy
1370                                     receiver_thread_starter,
1371                                     new(nothrow) receiver_thread_start_arg_t(this,argp));
1372  if (pthread_status)
1373  {
1374    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1375   
1376    delete argp;
1377  }
1378  else
1379  {
1380    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1381
1382    // remember pointer to thread arg structure
1383    // thread arg structure should be destroyed after thread termination only
1384    pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1385      recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1386    if (tmpinsiterator.second == false)
1387    {
1388      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1389    }
1390    unlock(); // uninstall_cleanup(1);
1391  } 
1392}
1393
1394
1395/**
1396 * master listener thread starter:
1397 * just a static starter method to allow starting the
1398 * actual master_listener_thread() method.
1399 *
1400 * @param argp - pointer to the current TPoverTCP object instance
1401 */
1402void*
1403TPoverTCP::master_listener_thread_starter(void *argp)
1404{
1405  // invoke listener thread method
1406  if (argp != 0)
1407  {
1408    (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1409  }
1410  return 0;
1411}
1412
1413
1414
1415/**
1416 * master listener thread: waits for incoming connections at the well-known tcp port
1417 * when a connection request is received this thread spawns a receiver_thread for
1418 * receiving packets from the peer at the new socket.
1419 */
1420void
1421TPoverTCP::master_listener_thread()
1422{
1423  // create a new address-structure for the listening masterthread
1424  struct sockaddr_in6 own_address;
1425  own_address.sin6_family = AF_INET6;
1426  own_address.sin6_flowinfo= 0;
1427  own_address.sin6_port = htons(tpparam.port); // use port number in param structure
1428  // accept incoming connections on all interfaces
1429  own_address.sin6_addr = in6addr_any;
1430  own_address.sin6_scope_id= 0;
1431 
1432  // create a listening socket
1433  int master_listener_socket= socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1434  if (master_listener_socket == -1)
1435  {
1436    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1437    return;
1438  }
1439 
1440  // Disable Nagle Algorithm, set (TCP_NODELAY)
1441  int nodelayflag= 1;
1442  int status= setsockopt(master_listener_socket,
1443                         IPPROTO_TCP,
1444                         TCP_NODELAY,
1445                         &nodelayflag,
1446                         sizeof(nodelayflag));
1447  if (status)
1448  {
1449    Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1450  }
1451 
1452  // Reuse ports, even if they are busy
1453  int socketreuseflag= 1;
1454  status= setsockopt(master_listener_socket,
1455                           SOL_SOCKET,
1456                           SO_REUSEADDR,
1457                           (const char *) &socketreuseflag,
1458                           sizeof(socketreuseflag));
1459  if (status)
1460  {
1461       Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1462  }
1463 
1464 
1465  // bind the newly created socket to a specific address
1466  int bind_status = bind(master_listener_socket,
1467                         reinterpret_cast<struct sockaddr *>(&own_address),
1468                         sizeof(own_address));
1469  if (bind_status)
1470    { 
1471      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " 
1472          << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN)
1473          << " port " << tpparam.port << " failed, error: " << strerror(errno));
1474      return;
1475    }
1476
1477    // listen at the socket,
1478    // queuesize for pending connections= max_listen_queue_size
1479    int listen_status = listen(master_listener_socket, max_listen_queue_size);
1480    if (listen_status)
1481    {
1482      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1483          << " failed, error: " << strerror(errno));
1484      return;
1485    }
1486    else
1487    {
1488      Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1489    }
1490
1491    // activate O_NON_BLOCK for accept (accept does not block)
1492    fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1493
1494    // create a pollfd struct for use in the mainloop
1495    struct pollfd poll_fd;
1496    poll_fd.fd = master_listener_socket;
1497    poll_fd.events = POLLIN | POLLPRI; 
1498    poll_fd.revents = 0;
1499    /*
1500      #define POLLIN    0x001   // There is data to read.
1501      #define POLLPRI   0x002   // There is urgent data to read. 
1502      #define POLLOUT   0x004   // Writing now will not block. 
1503    */
1504   
1505    bool terminate = false;
1506    // check for thread terminate condition using get_state()
1507    state_t currstate= get_state();
1508    int poll_status= 0;
1509    const unsigned int number_poll_sockets= 1; 
1510    struct sockaddr_in6 peer_address;
1511    socklen_t peer_address_len;
1512    int conn_socket;
1513
1514    // check whether this thread is signaled for termination
1515    while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1516    {
1517      // wait on number_poll_sockets (main drm socket)
1518      // for the events specified above for sleep_time (in ms)
1519      poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1520      if (poll_fd.revents & POLLERR) // Error condition
1521      {
1522        if (errno != EINTR) 
1523        {
1524          Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1525              "Poll caused error " << strerror(errno) << " - indicated by revents");
1526        }
1527        else
1528        {
1529          Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1530        }
1531
1532      }
1533      if (poll_fd.revents & POLLHUP) // Hung up
1534      {
1535        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1536        return;
1537      }
1538      if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1539      {
1540        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1541        return;
1542      }
1543     
1544      switch (poll_status)
1545      {
1546        case -1:
1547          if (errno != EINTR)
1548          {
1549            Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1550          }
1551          else
1552          {
1553            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1554          }
1555           
1556          break;
1557
1558        case 0:
1559#ifdef DEBUG_HARD
1560          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, 
1561              "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1562#endif
1563          currstate= get_state();
1564          continue;
1565          break;
1566
1567        default:
1568#ifdef DEBUG_HARD
1569          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1570#endif
1571          break;
1572      } // end switch
1573
1574      // after a successful accept call,
1575      // accept stores the address information of the connecting party
1576      // in peer_address and the size of its address in addrlen
1577      peer_address_len= sizeof(peer_address);
1578      conn_socket = accept (master_listener_socket,
1579                            reinterpret_cast<struct sockaddr *>(&peer_address),
1580                            &peer_address_len);
1581      if (conn_socket == -1)
1582      {
1583        if (errno != EWOULDBLOCK && errno != EAGAIN)
1584        {
1585          Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1586              << " failed, error: " << strerror(errno));
1587          return;
1588        }
1589      }
1590      else
1591      {
1592        // create a new assocdata-object for the new thread
1593        AssocData* peer_assoc = NULL;
1594        appladdress addr(peer_address, IPPROTO_TCP);
1595
1596        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str() 
1597            << " port #" << addr.get_port());
1598
1599        struct sockaddr_in6 own_address;
1600        socklen_t own_address_len= sizeof(own_address);
1601        getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1602
1603        // AssocData will copy addr content into its own structure
1604        // allocated peer_assoc will be stored in connmap
1605        peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1606
1607        bool insert_success= false;
1608        if (peer_assoc)
1609        {
1610          // start critical section
1611          lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1612          insert_success= connmap.insert(peer_assoc);
1613          // end critical section
1614          unlock(); // uninstall_cleanup(1);
1615        }
1616       
1617       
1618        if (insert_success == false) // not inserted into connmap
1619        {
1620          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1621              << ", " << addr.get_ip_str() << ", port #" 
1622              << addr.get_port() << " into connection map, aborting connection...");
1623
1624          // abort connection, delete its AssocData
1625          close (conn_socket);
1626          if (peer_assoc) 
1627          { 
1628            delete peer_assoc;
1629            peer_assoc= 0;
1630          }
1631          return;
1632               
1633        } //end __else(connmap.insert());__
1634       
1635        // create a new thread for each new connection
1636        create_new_receiver_thread(peer_assoc);
1637      } // end __else (connsocket)__
1638     
1639      // get new thread state
1640      currstate= get_state();
1641
1642    } // end while(!terminate)
1643    return;
1644} // end listen_for_connections()   
1645
1646
1647TPoverTCP::~TPoverTCP()
1648{
1649  init= false;
1650
1651  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,  "Destructor called");
1652
1653  QueueManager::instance()->unregister_queue(tpparam.source);
1654}
1655
1656/** TPoverTCP Thread main loop.
1657 * This loop checks for internal messages of either
1658 * a send() call to start a new receiver thread, or,
1659 * of a receiver_thread() that signals its own termination
1660 * for proper cleanup of control structures.
1661 * It also handles the following internal TPoverTCPMsg types:
1662 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1663 * - TPoverTCPMsg::start - a particular receiver thread is started
1664 * @param nr number of current thread instance
1665 */
1666void 
1667TPoverTCP::main_loop(uint32 nr)
1668{
1669
1670  // get internal queue for messages from receiver_thread
1671  FastQueue* fq = get_fqueue();
1672  if (!fq) 
1673  {
1674    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1675    return;
1676  } // end if not fq
1677  // register queue for receiving internal messages from other modules
1678  QueueManager::instance()->register_queue(fq,tpparam.source);
1679
1680  // start master listener thread
1681  pthread_t master_listener_thread_ID;
1682  int pthread_status= pthread_create(&master_listener_thread_ID, 
1683                                     NULL, // NULL: default attributes: thread is joinable and has a
1684                                     //       default, non-realtime scheduling policy
1685                                     master_listener_thread_starter,
1686                                     this);
1687  if (pthread_status)
1688  {
1689    Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1690        "New master listener thread could not be created: " <<  strerror(pthread_status));
1691  }
1692  else
1693    Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1694
1695
1696  // define max latency for thread reaction on termination/stop signal
1697  timespec wait_interval= { 0, 250000000L }; // 250ms
1698  message* internal_thread_msg = NULL;
1699  state_t currstate= get_state();
1700
1701  // check whether this thread is signaled for termination
1702  while( currstate!=STATE_ABORT && currstate!=STATE_STOP ) 
1703  {
1704    // poll internal message queue (blocking)
1705    if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1706    {
1707      TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1708      if (internalmsg)
1709      {
1710        if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1711        {
1712          // a receiver thread terminated and signaled for cleanup by master thread
1713          AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1714          Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1715          lock();
1716          cleanup_receiver_thread( assocd );
1717          unlock();
1718        }
1719        else
1720        if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1721        {
1722          // start a new receiver thread
1723          create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1724        }
1725        else
1726          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1727         
1728        delete internalmsg;
1729      }
1730      else
1731      {
1732        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1733            << internal_thread_msg->get_source());
1734      }
1735    } // endif
1736
1737    // get thread state
1738    currstate= get_state();
1739  } // end while
1740
1741  if (currstate==STATE_STOP)
1742  {
1743    // start abort actions
1744    Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1745  } // end if stopped
1746
1747  // do not accept any more messages
1748  fq->shutdown();
1749  // terminate all receiver and sender threads that are still active
1750  terminate_all_threads();
1751}
1752
1753} // end namespace protlib
1754///@}
Note: See TracBrowser for help on using the repository browser.