An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.cpp @ 8606

Last change on this file since 8606 was 8606, checked in by Christoph Mayer, 13 years ago

-memleaks

File size: 55.9 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29 
30extern "C"
31{
32  //#define _SOCKADDR_LEN    /* use BSD 4.4 sockets */
33#include <unistd.h>      /* gethostname */
34#include <sys/types.h>   /* network socket interface */
35#include <netinet/in.h>  /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h>   /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63#define IPV6_ADDR_INT32_SMP 0x0000ffff
64
65namespace protlib {
66
67void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6)       {
68        bzero(sin, sizeof(*sin));
69        sin->sin_family = AF_INET;
70        sin->sin_port = sin6->sin6_port;
71        memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
72}
73
74/* Convert sockaddr_in to sockaddr_in6 in v4 mapped addr format. */
75void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
76        bzero(sin6, sizeof(*sin6));
77        sin6->sin6_family = AF_INET6;
78        sin6->sin6_port = sin->sin_port;
79        *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
80        *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
81        *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
82        *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
83}
84
85using namespace log;
86
87/** @defgroup tptcp TP over TCP
88 * @ingroup network
89 * @{
90 */
91
92char in6_addrstr[INET6_ADDRSTRLEN+1];
93
94
95/******* class TPoverTCP *******/
96
97
98/** get_connection_to() checks for already existing connections.
99 *  If a connection exists, it returns "AssocData"
100 *  and saves it in "connmap" for further use
101 *  If no connection exists, a new connection to "addr"
102 *  is created.
103 */
104AssocData* 
105TPoverTCP::get_connection_to(const appladdress& addr)
106{
107  // get timeout
108  struct timespec ts;
109  get_time_of_day(ts);
110  ts.tv_nsec+= tpparam.sleep_time * 1000000L;
111  if (ts.tv_nsec>=1000000000L)
112  {
113    ts.tv_sec += ts.tv_nsec / 1000000000L;
114    ts.tv_nsec= ts.tv_nsec % 1000000000L;
115  }
116
117  // create a new AssocData pointer, initialize it to NULL
118  AssocData* assoc= NULL;
119  int new_socket;
120  // loop until timeout is exceeded: TODO: check applicability of loop
121  do 
122  {
123    // check for existing connections to addr
124    // start critical section
125    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
126    assoc= connmap.lookup(addr);
127    // end critical section
128    unlock(); // uninstall_cleanup(1);
129    if (assoc) 
130    {
131      // If not shut down then use it, else abort, wait and check again.
132      if (!assoc->shutdown) 
133      {
134        return assoc;
135      }
136      else
137      {
138        // TODO: connection is already in shutdown mode. What now?
139        ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
140
141        return 0;
142      } 
143    } //end __if (assoc)__
144    else 
145    {
146      Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to " 
147          << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
148    }
149
150    // no connection found, create a new one
151    new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
152    if (new_socket == -1)
153    {
154      ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
155
156      return 0;
157    }
158
159    // Disable Nagle Algorithm, set (TCP_NODELAY)
160    int nodelayflag= 1;
161    int status= setsockopt(new_socket,
162                           IPPROTO_TCP,
163                           TCP_NODELAY,
164                           &nodelayflag,
165                           sizeof(nodelayflag));
166    if (status)
167    {
168        ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
169    }
170   
171    // Reuse ports, even if they are busy
172    int socketreuseflag= 1;
173    status= setsockopt(new_socket,
174                           SOL_SOCKET,
175                           SO_REUSEADDR,
176                           (const char *) &socketreuseflag,
177                           sizeof(socketreuseflag));
178    if (status)
179    {
180        ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
181    }
182
183        struct sockaddr_in6 dest_address;
184        dest_address.sin6_flowinfo= 0;
185        dest_address.sin6_scope_id= 0;
186        addr.get_sockaddr(dest_address);
187
188        // connect the socket to the destination address
189        int connect_status = 0;
190        if (v4_mode) {
191                struct sockaddr_in dest_address_v4;
192                v6_to_v4( &dest_address_v4, &dest_address );
193                connect_status = connect(new_socket,
194                                         reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
195                                         sizeof(dest_address));
196        } else {
197                connect_status = connect(new_socket,
198                                         reinterpret_cast<const struct sockaddr*>(&dest_address),
199                                         sizeof(dest_address));
200        }
201     
202    // connects to the listening_port of the peer's masterthread   
203    if (connect_status != 0)
204    {
205      ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port() 
206          << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
207     
208      return 0; // error: couldn't connect
209    }
210
211
212    struct sockaddr_in6 own_address;
213        if (v4_mode) {
214                struct sockaddr_in own_address_v4;
215                socklen_t own_address_len_v4 = sizeof(own_address_v4);
216                getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
217                v4_to_v6(&own_address_v4, &own_address);
218        } else {
219                socklen_t own_address_len= sizeof(own_address);
220                getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
221        }
222
223    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port() 
224        << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN) 
225        << " port #" << ntohs(own_address.sin6_port));
226
227   
228    // create new AssocData record (will copy addr)
229    assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
230
231    // if assoc could be successfully created, insert it into ConnectionMap
232    if (assoc) 
233    {
234      bool insert_success= false;
235      // start critical section
236      lock(); // install_cleanup_thread_lock(TPoverTCP, this);
237      // insert AssocData into connection map
238      insert_success= connmap.insert(assoc);
239      // end critical section
240      unlock(); // uninstall_cleanup(1);
241
242      if (insert_success == true) 
243      {
244#ifdef _DEBUG
245        Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
246            << " via socket " << new_socket);
247
248               
249#endif
250
251        // notify master thread to start a receiver thread (i.e. send selfmessage)
252        TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
253        if (newmsg)
254        {
255          bool ret = newmsg->send_to(tpparam.source);
256          if(!ret) delete newmsg;
257          return assoc;
258        }
259        else
260          ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
261      } 
262      else 
263      {
264        // delete data and abort
265        ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
266            <<", port #" << addr.get_port() << " into connection map, aborting connection");
267               
268        // abort connection, delete its AssocData
269        close (new_socket);
270        if (assoc) 
271        { 
272          delete assoc; 
273          assoc= 0;
274        }
275        return assoc;
276      } // end else connmap.insert                     
277     
278    } // end "if (assoc)"
279  } 
280  while (wait_cond(ts)!=ETIMEDOUT);
281
282  return assoc;
283} //end get_connection_to
284
285
286/** terminates a signaling association/connection
287 *  - if no connection exists, generate a warning
288 *  - otherwise: generate internal msg to related receiver thread
289 */
290void
291TPoverTCP::terminate(const address& in_addr)
292{
293#ifndef _NO_LOGGING
294 const char *const thisproc="terminate()   - ";
295#endif
296
297 appladdress* addr = NULL;
298 addr = dynamic_cast<appladdress*>(in_addr.copy());
299
300 if (!addr) return;
301
302  // Create a new AssocData-pointer
303  AssocData* assoc = NULL;
304
305  // check for existing connections to addr
306
307  // start critical section:
308  // please note if receiver_thread terminated already, the assoc data is not
309  // available anymore therefore we need a lock around cleanup_receiver_thread()
310
311  lock(); // install_cleanup_thread_lock(TPoverTCP, this);
312  assoc= connmap.lookup(*addr);
313  if (assoc) 
314  {
315    EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
316    // If not shut down then use it, else abort, wait and check again.
317    if (!assoc->shutdown) 
318    {
319      if (assoc->socketfd)
320      {
321        // disallow sending
322        if (shutdown(assoc->socketfd,SHUT_WR))
323        {
324          ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
325        }
326        else
327        {
328          EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );       
329        }
330      }
331      assoc->shutdown= true;
332    }
333    else
334      EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
335     
336  }
337  else
338    WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
339
340  stop_receiver_thread(assoc);
341
342  // end critical section
343  unlock(); // uninstall_cleanup(1);
344
345  if (addr) delete addr;
346}
347
348
349/** generates and internal TPoverTCP message to send a NetMsg to the network
350 *  - it is necessary to let a thread do this, because the caller
351 *     may get blocked if the connect() or send() call hangs for a while
352 *  - the sending thread will call TPoverTCP::tcpsend()
353 *  - if no connection exists, creates a new one (unless use_existing_connection is true)
354 *  @note the netmsg is deleted by the send() method when it is not used anymore
355 */
356void
357TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
358{
359  if (netmsg == NULL) {
360    ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
361    return;
362  }
363
364  appladdress* addr = NULL;
365  addr= dynamic_cast<appladdress*>(in_addr.copy());
366   
367  if (!addr)
368  {
369    ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
370    return;
371  } 
372
373  // lock due to sendermap access
374  lock();
375   
376  // check for existing sender thread
377  sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
378   
379  FastQueue* destqueue= 0;
380
381  if (it == senderthread_queuemap.end())
382  { // no sender thread found so far
383
384    // if we already have an existing connection it is save to create a sender thread
385    // since get_connection_to() will not be invoked, so an existing connection will
386    // be used
387    const AssocData* assoc = connmap.lookup(*addr);
388
389    //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
390
391    if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
392    { // it is allowed to create a new connection for this thread
393      // create a new queue for sender thread
394      FastQueue* sender_thread_queue= new FastQueue;
395      create_new_sender_thread(sender_thread_queue);
396      // remember queue for later use
397     
398      //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
399      senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
400     
401      destqueue= sender_thread_queue;
402    }
403  }
404  else
405  { // we have a sender thread, use stored queue from map
406    destqueue= it->second; 
407  }
408   
409  unlock();
410   
411  // send a send_data message to it (if we have a destination queue)
412  if (destqueue)
413  {
414    // both parameters will be freed after message was sent!
415   
416    appladdress* apl=new appladdress(*addr);
417    TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,apl);
418    if (internalmsg)
419    {
420      // send the internal message to the sender thread queue
421      bool sent = internalmsg->send(tpparam.source,destqueue);
422      if (!sent) {
423          delete internalmsg->get_appladdr();
424          delete internalmsg;
425          internalmsg = NULL;
426      }
427    }
428  }
429  else
430  {
431    if (!use_existing_connection)
432      WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
433    else
434    {
435      DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
436
437    }
438    // cannot send data, so we must drop it
439    delete netmsg;
440  }
441 
442  if (addr) delete addr;
443}
444
445/** sends a NetMsg to the network.
446 *
447 * @param netmsg message to send
448 * @param addr   transport endpoint address
449 *
450 * @note   if no connection exists, creates a new one
451 * @note   both parameters are deleted after the message was sent
452 */
453void
454TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
455{                       
456#ifndef _NO_LOGGING
457  const char *const thisproc="sender   - ";
458#endif
459
460  // set result initially to success, set it to failure
461  // in places where these failures occur
462  int result = TCP_SUCCESS;
463  int saved_errno= 0;
464  int ret= 0;
465
466  // Create a new AssocData-pointer
467  AssocData* assoc = NULL;
468 
469  // tp.cpp checks for initialisation of tp and correctness of
470  // msgsize, protocol and ip,
471  // throws an error if something is not right
472  if (addr) {
473    addr->convert_to_ipv6();
474    check_send_args(*netmsg,*addr);
475  } 
476  else 
477  {
478    ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
479    result= TCP_SEND_FAILURE;
480
481    delete netmsg;
482    delete addr;
483
484    throw TPErrorInternal();
485  }
486
487
488  // check for existing connections,
489  // if a connection exists, return its AssocData
490  // and save it in assoc for further use
491  // if no connection exists, create a new one (in get_connection_to()).
492  // Connection is inserted into connection map that must be done
493  // with exclusive access
494  assoc= get_connection_to(*addr);
495
496  if (assoc==NULL || assoc->socketfd<=0)
497  {
498    ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
499
500    delete netmsg;
501    delete addr;
502    return;
503  }
504
505  if (assoc->shutdown)
506  {
507    Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
508    delete netmsg;
509    delete addr;
510
511    throw TPErrorSendFailed();
512  }
513
514  uint32 msgsize= netmsg->get_size();
515#ifdef DEBUG_HARD
516  cerr << thisproc << "message size=" << netmsg->get_size() << endl;
517#endif
518
519  const uint32 retry_send_max = 3;
520  uint32 retry_count = 0;
521  // send all the data contained in netmsg to the socket
522  // which belongs to the address "addr"
523  for (uint32 bytes_sent= 0;
524       bytes_sent < msgsize;
525       bytes_sent+= ret)
526  {
527
528#ifdef _DEBUG_HARD
529    for (uint32 i=0;i<msgsize;i++)
530    {
531      cout << "send_buf: " << i << " : "; 
532      if ( isalnum(*(netmsg->get_buffer()+i)) )
533        cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
534      else
535        cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
536      cout << endl;
537    }
538
539    cout << endl;
540    cout << "bytes_sent: " << bytes_sent << endl;
541    cout << "Message size: " <<  msgsize << endl;
542    cout << "Send-Socket: " << assoc->socketfd << endl;
543    cout << "pointer-Offset. " << netmsg->get_pos() << endl;
544    cout << "vor send " << endl;
545#endif
546
547    retry_count= 0;
548    do 
549    {
550      // socket send
551      ret= ::send(assoc->socketfd, 
552                  netmsg->get_buffer() + bytes_sent, 
553                  msgsize - bytes_sent, 
554                  MSG_NOSIGNAL);
555     
556      // send_buf + bytes_sent
557
558      // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
559      // retry sending
560      if (ret < 0)
561      { // internal error during send occured
562        saved_errno= errno;
563        switch(saved_errno)
564        {
565          case EAGAIN:  // same as EWOULDBLOCK
566          case EINTR:   // man page says: A signal occurred before any data was transmitted.
567          case ENOBUFS: //  The output queue for a network interface was full.
568            retry_count++;
569            ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
570                   << " - retry sending, retry #" << retry_count);
571            // pace down a little bit, sleep for a while
572            sleep(1);
573            break;
574
575            // everything else should not lead to repetition
576          default:
577            retry_count= retry_send_max;
578            break;
579        } // end switch
580      } // endif
581      else // leave while
582        break;
583    } 
584    while(retry_count < retry_send_max);
585
586    if (ret < 0) 
587    { // unrecoverable error occured
588     
589      result= TCP_SEND_FAILURE;
590      break;
591    } // end if (ret < 0)
592    else
593    {
594      if (debug_pdu)
595      {
596        ostringstream hexdump;
597        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
598        DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
599      }
600    }
601
602    // continue with send next data block in next for() iteration
603  } // end for
604
605  // *** note: netmsg is deleted here ***
606  delete netmsg;
607 
608  // Throwing an exception within a critical section does not
609  // unlock the mutex.
610
611  if (result != TCP_SUCCESS)
612  {
613    ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
614    delete addr;
615
616    throw TPErrorSendFailed(saved_errno);
617   
618  }
619  else
620    EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd  << " to " << *addr);
621
622  if (!assoc) {
623    // no connection
624   
625    ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str() 
626        << ", port #" << addr->get_port());
627
628    delete addr;
629
630    throw TPErrorUnreachable(); // should be no assoc found
631  } // end "if (!assoc)"
632 
633  // *** delete address ***
634  delete addr;
635}
636 
637/* this thread waits for an internal message that either:
638 * - requests transmission of a packet
639 * - requests to stop this thread
640 * @param argp points to the thread queue for internal messages
641 */
642void 
643TPoverTCP::sender_thread(void *argp)
644{
645#ifndef _NO_LOGGING
646  const char *const methodname="senderthread - ";
647#endif
648
649  message* internal_thread_msg = NULL;
650
651  EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
652
653  FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
654  if (!fq)
655  {
656    ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
657    return;
658  }
659
660  bool terminate= false;
661  TPoverTCPMsg* internalmsg= 0;
662  while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
663  {
664    internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
665   
666    if (internalmsg == 0)
667    {
668      ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
669    }
670    else
671    if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
672    {
673      // create a connection if none exists and send the netmsg
674      if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
675      {
676        try 
677        {
678          tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
679        } // end try
680        catch(TPErrorSendFailed& err)
681        {
682          ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what() 
683                 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
684        } // end catch
685        catch(TPError& err)
686        {
687          ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
688        } // end catch
689        catch(...)
690        {
691          ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
692        }
693      }
694      else
695      {
696        ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
697      } 
698    }
699    else
700    if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
701    {
702      terminate= true;
703    }
704
705    delete internalmsg;
706  } // end while
707 
708  EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
709}
710
711
712/** receiver thread listens at a TCP socket for incoming PDUs
713 *  and passes complete PDUs to the coordinator. Incomplete
714 *  PDUs due to aborted connections or buffer overflows are discarded.
715 *  @param argp - assoc data and flags sig_terminate and terminated
716 * 
717 *  @note this is a static member function, so you cannot use class variables
718 */
719void 
720TPoverTCP::receiver_thread(void *argp)
721{
722#ifndef _NO_LOGGING
723  const char *const methodname="receiver - ";
724#endif
725
726  receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
727  const appladdress* peer_addr = NULL;
728  const appladdress* own_addr = NULL;
729  uint32 bytes_received = 0;
730  TPMsg* tpmsg= NULL;
731 
732  // argument parsing - start
733  if (receiver_thread_argp == 0)
734  {
735    ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
736
737    return;
738  }
739  else
740  {
741    // change status to running, i.e., not terminated
742    receiver_thread_argp->terminated= false;
743
744#ifdef _DEBUG
745    DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
746#endif
747  }
748
749  int conn_socket= 0;
750  if (receiver_thread_argp->peer_assoc)
751  {
752    // get socket descriptor from arg
753    conn_socket = receiver_thread_argp->peer_assoc->socketfd;
754    // get pointer to peer address of socket (source/sender address of peer) from arg
755    peer_addr= &receiver_thread_argp->peer_assoc->peer;
756    own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
757  }
758  else
759  {
760    ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
761   
762    return;
763  }
764
765  if (peer_addr == 0)
766  {
767    ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
768   
769    return;
770  }
771  // argument parsing - end
772#ifdef _DEBUG
773  Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
774      "Preparing to wait for data at socket " 
775      << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
776#endif
777
778  int ret= 0;
779  uint32 msgcontentlength= 0;
780  bool msgcontentlength_known= false;
781  bool pdu_complete= false; // when to terminate inner loop
782
783  /* maybe use this to create a new pdu,
784    /// constructor
785    contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
786  */ 
787
788  // activate O_NON_BLOCK  for recv on socket conn_socket
789  // CAVEAT: this also implies non-blocking send()!
790  //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
791   
792  // set options for polling
793  const unsigned int number_poll_sockets= 1; 
794  struct pollfd poll_fd;
795  // have to set structure before poll call
796  poll_fd.fd = conn_socket;
797  poll_fd.events = POLLIN | POLLPRI; 
798  poll_fd.revents = 0;
799
800  int poll_status;
801  bool recv_error= false;
802
803  NetMsg* netmsg= 0;
804  NetMsg* remainbuf= 0;
805  size_t buffer_bytes_left= 0;
806  size_t trailingbytes= 0;
807  bool skiprecv= false;
808  // loop until we receive a terminate signal (read-only var for this thread)
809  // or get an error from socket read
810  while( receiver_thread_argp->sig_terminate == false )
811  {
812    // Read next PDU from socket or process trailing bytes in remainbuf
813    ret= 0;
814    msgcontentlength= 0;
815    msgcontentlength_known= false;
816    pdu_complete= false;
817    netmsg= 0;
818
819    // there are trailing bytes left from the last receive call
820    if (remainbuf != 0)
821    {
822      netmsg= remainbuf;
823      remainbuf= 0;
824      buffer_bytes_left= netmsg->get_size()-trailingbytes;
825      bytes_received= trailingbytes;
826      trailingbytes= 0;
827      skiprecv= true;
828    }
829    else // no trailing bytes, create a new buffer
830    if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
831    {
832      buffer_bytes_left= netmsg->get_size();
833      bytes_received= 0;
834      skiprecv= false;
835    }
836    else
837    { // buffer allocation failed
838      bytes_received= 0;
839      buffer_bytes_left= 0;
840      recv_error= true;
841    }
842   
843    // loops until PDU is complete
844    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
845    while (!pdu_complete && 
846           !recv_error && 
847           !receiver_thread_argp->sig_terminate)
848    {
849      if (!skiprecv)
850      {
851        // read from TCP socket or return after sleep_time
852        poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
853       
854        if (receiver_thread_argp->sig_terminate)
855        {
856          Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
857          // disallow sending
858          AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
859          if (myassoc->shutdown == false)
860          {
861            myassoc->shutdown= true;
862            if (shutdown(myassoc->socketfd,SHUT_WR))
863            {
864              if ( errno != ENOTCONN )
865                Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
866            }
867          }
868          // try to read do a last read from the TCP socket or return after sleep_time
869          if (poll_status == 0)
870          {
871            poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
872          }
873        }
874
875        if (poll_fd.revents & POLLERR) // Error condition
876        {
877          if (errno == 0 || errno == EINTR)
878          {
879            EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
880          }
881          else
882          {
883            ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
884            recv_error= true;
885          }
886        }
887       
888        if (poll_fd.revents & POLLHUP) // Hung up
889        {
890          Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
891          recv_error= true;
892        }
893       
894        if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
895        {
896          EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
897          recv_error= true;
898        }
899       
900        // check status (return value) of poll call
901        switch (poll_status)
902        {
903          case -1:
904            if (errno == 0 || errno == EINTR)
905            {
906              EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
907            }
908            else
909            {
910              ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
911              recv_error= true;
912            }
913           
914            continue; // next while iteration
915            break;
916           
917          case 0:
918#ifdef DEBUG_HARD
919            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
920#endif
921            continue; // next while iteration
922            break;
923           
924          default:
925#ifdef DEBUG_HARD
926            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
927#endif
928            break;
929        } // end switch
930
931
932        /// receive data from socket buffer (recv will not block)
933        ret = recv(conn_socket, 
934                   netmsg->get_buffer() + bytes_received, 
935                   buffer_bytes_left, 
936                   MSG_DONTWAIT);
937
938        if ( ret < 0 )
939        {
940          delete netmsg;
941          if (errno!=EAGAIN && errno!=EWOULDBLOCK)
942          {
943            ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
944            recv_error= true;
945            continue;
946          }
947          else
948          { // errno==EAGAIN || errno==EWOULDBLOCK
949            // just nothing to read from socket, continue w/ next poll
950            continue;
951          }
952        }
953        else
954        {
955          if (ret == 0)
956          {
957            // this means that EOF is reached,
958            // other side has closed connection
959            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
960            // disallow sending
961            AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
962            if (myassoc->shutdown == false)
963            {
964              myassoc->shutdown= true;
965              if (shutdown(myassoc->socketfd,SHUT_WR))
966              {
967                if ( errno != ENOTCONN )
968                  Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
969              }
970            }
971            // not a real error, but we must quit the receive loop
972            recv_error= true;
973          }
974          else
975          {
976
977            Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
978            // track number of received bytes
979            bytes_received+= ret;
980            buffer_bytes_left-= ret;
981          }
982        }
983      } // end if do not skip recv() statement     
984
985      if (buffer_bytes_left < 0) ///< buffer space exhausted now
986      {
987        recv_error= true;
988        Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
989      }
990
991      if (!msgcontentlength_known) ///< common header not parsed
992      {
993        // enough bytes read to parse common header?
994        if (bytes_received >= common_header_length)
995        {
996          // get message content length in number of bytes
997          if (getmsglength(*netmsg, msgcontentlength))
998            msgcontentlength_known= true;
999          else
1000          {
1001            ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
1002
1003            ostringstream hexdumpstr;
1004            netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1005            DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
1006
1007            // reset all counters
1008            msgcontentlength= 0;
1009            msgcontentlength_known= false;
1010            bytes_received= 0;
1011            pdu_complete= false;
1012            continue;
1013          }
1014        }
1015      } // endif common header not parsed
1016
1017      // check whether we have read the whole Protocol PDU
1018      DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
1019      if (msgcontentlength_known)
1020      {
1021        if (bytes_received-common_header_length >= msgcontentlength )
1022        {
1023          pdu_complete= true;
1024          // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1025          netmsg->truncate(common_header_length+msgcontentlength);
1026
1027          // trailing bytes are copied into new buffer
1028          if (bytes_received-common_header_length > msgcontentlength)
1029          {
1030            WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1031            remainbuf= new NetMsg(NetMsg::max_size);
1032            trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1033            bytes_received= common_header_length+msgcontentlength;
1034            memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1035          }
1036        }
1037        else
1038        { // not enough bytes in the buffer must continue to read from network
1039          skiprecv= false;
1040        }
1041      } // endif msgcontentlength_known
1042    } // end while (!pdu_complete && !recv_error && !signalled for termination)
1043    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1044
1045    // if other side closed the connection, we should still be able to deliver the remaining data
1046    if (ret == 0)
1047    {
1048      recv_error= false;
1049    }
1050
1051    // deliver only complete PDUs to signaling module
1052    if (!recv_error && pdu_complete)
1053    {
1054      // create TPMsg and send it to the signaling thread
1055      tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1056      if (tpmsg)
1057      {
1058        DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1059             << " to module " <<  message::get_qaddr_name(tpparam.dest));
1060      }
1061
1062      debug_pdu=false;
1063
1064      if (debug_pdu)
1065      {
1066        ostringstream hexdump;
1067        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1068        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1069      }
1070
1071      // send the message if it was successfully created
1072      // bool message::send_to(qaddr_t dest, bool exp = false);
1073      if (!tpmsg
1074          || (!tpmsg->get_peeraddress())
1075          || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest))) 
1076      {
1077        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1078        if (tpmsg) delete tpmsg;
1079      } // end if tpmsg not allocated or not addr or not sent
1080     
1081
1082    } // end if !recv_error
1083    else
1084    { // error during receive or PDU incomplete
1085      if (bytes_received>0)
1086      {
1087        Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ?  "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1088      }
1089
1090      if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1091      {
1092        ostringstream hexdumpstr;
1093        netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1094        Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str()); 
1095      }
1096      // leave the outer loop
1097      /**********************/
1098      break;
1099      /**********************/
1100    } // end else
1101
1102  } // end while (thread not signalled for termination)
1103
1104  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() 
1105      << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1106
1107  // shutdown socket
1108  if (shutdown(conn_socket, SHUT_RD))
1109  {
1110    if ( errno != ENOTCONN )
1111      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1112  }
1113
1114  // close socket
1115  close(conn_socket);
1116
1117  receiver_thread_argp->terminated= true;
1118  delete netmsg;
1119
1120  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1121
1122#ifdef _DEBUG
1123  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1124#endif
1125  // notify master thread for invocation of cleanup procedure
1126  TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1127  // send message to main loop thread
1128  newmsg->send_to(tpparam.source);
1129
1130} 
1131
1132
1133/** this signals a terminate to a thread and wait for the thread to stop
1134 *  @note it is not safe to access any thread related data after this method returned,
1135 *        because the receiver thread will initiate a cleanup_receiver_thread() method
1136 *        which may erase all relevant thread data.
1137 */
1138void 
1139TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1140{
1141  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1142  // after this procedure peer_assoc may be invalid because it was erased
1143
1144  // start critical section
1145
1146  if (peer_assoc == 0)
1147    return;
1148
1149  pthread_t thread_id=  peer_assoc->thread_ID;
1150 
1151  // try to clean up receiver_thread_arg
1152  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1153  receiver_thread_arg_t* recv_thread_argp= 
1154    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1155  if (recv_thread_argp)
1156  {
1157    if (!recv_thread_argp->terminated)
1158    {
1159      // thread signaled termination, but is not?
1160      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1161
1162      // signal thread for its termination
1163      recv_thread_argp->sig_terminate= true;
1164      // wait for thread to join after termination
1165      pthread_join(thread_id, 0);
1166      // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1167      return;
1168    }
1169  }
1170  else
1171    Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1172
1173}
1174
1175
1176/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1177 *  usually called by the master_thread after the receiver_thread terminated
1178 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1179 *       is still valid
1180 */
1181void 
1182TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1183{
1184  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1185  // after this procedure peer_assoc may be invalid because it was erased
1186
1187  // start critical section
1188
1189  if (peer_assoc == 0)
1190    return;
1191
1192  pthread_t thread_id=  peer_assoc->thread_ID;
1193 
1194  // try to clean up receiver_thread_arg
1195  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1196  receiver_thread_arg_t* recv_thread_argp= 
1197    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1198  if (recv_thread_argp)
1199  {
1200    if (!recv_thread_argp->terminated)
1201    {
1202      // thread signaled termination, but is not?
1203      Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1204      return;
1205    }
1206    else
1207    { // if thread is already terminated
1208      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1209
1210      // delete it from receiver map
1211      recv_thread_argmap.erase(recv_thread_arg_iter);
1212
1213      // then delete receiver arg structure
1214      delete recv_thread_argp;
1215    }
1216  }
1217
1218  // delete entry from connection map
1219
1220  // cleanup sender thread
1221  // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1222  terminate_sender_thread(peer_assoc);
1223
1224  // delete the AssocData structure from the connection map
1225  // also frees allocated AssocData structure
1226  connmap.erase(peer_assoc);
1227
1228  // end critical section
1229
1230  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1231}
1232
1233
1234/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1235 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1236 *       is still valid, a lock is also required, because senderthread_queuemap is changed
1237 */
1238void 
1239TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1240{
1241  if (assoc == 0)
1242  {
1243    Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1244    return;
1245  }
1246
1247  sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1248
1249  if (it != senderthread_queuemap.end())
1250  { // we have a sender thread: send a stop message to it
1251    FastQueue* destqueue= it->second; 
1252    if (destqueue)
1253    {
1254      TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1255      if (internalmsg)
1256      {
1257        // send the internal message to the sender thread queue
1258        internalmsg->send(tpparam.source,destqueue);
1259      }
1260    }
1261    else
1262    {
1263      Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1264    }
1265    // erase entry from map
1266    senderthread_queuemap.erase(it);
1267  }
1268}
1269
1270/* terminate all active threads
1271 * note: locking should not be necessary here because this message is called as last method from
1272 * main_loop()
1273 */
1274void 
1275TPoverTCP::terminate_all_threads()
1276{
1277  AssocData* assoc= 0;
1278  receiver_thread_arg_t* terminate_argp;
1279
1280  for (recv_thread_argmap_t::iterator terminate_iterator=  recv_thread_argmap.begin();
1281       terminate_iterator !=  recv_thread_argmap.end();
1282       terminate_iterator++)
1283  {
1284    if ( (terminate_argp= terminate_iterator->second) != 0)
1285    {
1286      // we need a non const pointer to erase it later on
1287      assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1288      // check whether thread is still alive
1289      if (terminate_argp->terminated == false)
1290      {
1291        terminate_argp->sig_terminate= true;
1292        // then wait for its termination
1293        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1294            "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1295       
1296        pthread_join(terminate_iterator->first, 0);
1297       
1298        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first  << "> is terminated");
1299      }
1300      else
1301        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1302            "Receiver thread <" << terminate_iterator->first << "> already terminated");
1303       
1304      // cleanup all remaining argument structures of terminated threads
1305      delete terminate_argp;
1306
1307      // terminate any related sender thread that is still running
1308      terminate_sender_thread(assoc);
1309     
1310      connmap.erase(assoc);
1311      // delete assoc is not necessary, because connmap.erase() will do the job
1312    }
1313  } // end for
1314}
1315
1316
1317/**
1318 * sender thread starter:
1319 * just a static starter method to allow starting the
1320 * actual sender_thread() method.
1321 *
1322 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1323 */
1324void*
1325TPoverTCP::sender_thread_starter(void *argp)
1326{
1327  sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1328 
1329  //cout << "invoked sender_thread_Starter" << endl;
1330
1331  // invoke sender thread method
1332  if (sargp != 0 && sargp->instance != 0)
1333  {
1334    // call receiver_thread member function on object instance
1335    sargp->instance->sender_thread(sargp->sender_thread_queue);
1336
1337    //cout << "Before deletion of sarg" << endl;
1338
1339    // no longer needed
1340    delete sargp;
1341  }
1342  else
1343  {
1344    Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1345  }
1346  return 0;
1347}
1348
1349
1350
1351
1352/**
1353 * receiver thread starter:
1354 * just a static starter method to allow starting the
1355 * actual receiver_thread() method.
1356 *
1357 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1358 */
1359void*
1360TPoverTCP::receiver_thread_starter(void *argp)
1361{
1362  receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1363  // invoke receiver thread method
1364  if (rargp != 0 && rargp->instance != 0)
1365  {
1366    // call receiver_thread member function on object instance
1367    rargp->instance->receiver_thread(rargp->rtargp);
1368
1369    // no longer needed
1370    delete rargp;
1371  }
1372  else
1373  {
1374    Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1375  }
1376  return 0;
1377}
1378
1379
1380void
1381TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1382{
1383  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1384
1385  pthread_t senderthreadid;
1386  // create new thread; (arg == 0) is handled by thread, too
1387  int pthread_status= pthread_create(&senderthreadid, 
1388                                     NULL, // NULL: default attributes: thread is joinable and has a
1389                                     //       default, non-realtime scheduling policy
1390                                     TPoverTCP::sender_thread_starter,
1391                                     new sender_thread_start_arg_t(this,senderfqueue));
1392  if (pthread_status)
1393  {
1394    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1395   
1396    delete senderfqueue;
1397  }
1398}
1399
1400
1401void
1402TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1403{
1404  receiver_thread_arg_t* argp= 
1405    new(nothrow) receiver_thread_arg(peer_assoc);
1406 
1407  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1408
1409  // create new thread; (arg == 0) is handled by thread, too
1410  int pthread_status= pthread_create(&peer_assoc->thread_ID, 
1411                                     NULL, // NULL: default attributes: thread is joinable and has a
1412                                     //       default, non-realtime scheduling policy
1413                                     receiver_thread_starter,
1414                                     new(nothrow) receiver_thread_start_arg_t(this,argp));
1415  if (pthread_status)
1416  {
1417    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1418   
1419    delete argp;
1420  }
1421  else
1422  {
1423    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1424
1425    // remember pointer to thread arg structure
1426    // thread arg structure should be destroyed after thread termination only
1427    pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1428      recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1429    if (tmpinsiterator.second == false)
1430    {
1431      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1432    }
1433    unlock(); // uninstall_cleanup(1);
1434  } 
1435}
1436
1437
1438/**
1439 * master listener thread starter:
1440 * just a static starter method to allow starting the
1441 * actual master_listener_thread() method.
1442 *
1443 * @param argp - pointer to the current TPoverTCP object instance
1444 */
1445void*
1446TPoverTCP::master_listener_thread_starter(void *argp)
1447{
1448  // invoke listener thread method
1449  if (argp != 0)
1450  {
1451    (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1452  }
1453  return 0;
1454}
1455
1456
1457
1458/**
1459 * master listener thread: waits for incoming connections at the well-known tcp port
1460 * when a connection request is received this thread spawns a receiver_thread for
1461 * receiving packets from the peer at the new socket.
1462 */
1463void
1464TPoverTCP::master_listener_thread()
1465{
1466  // create a new address-structure for the listening masterthread
1467  struct sockaddr_in6 own_address_v6;
1468  own_address_v6.sin6_family = AF_INET6;
1469  own_address_v6.sin6_flowinfo= 0;
1470  own_address_v6.sin6_port = htons(tpparam.port); // use port number in param structure
1471  // accept incoming connections on all interfaces
1472  own_address_v6.sin6_addr = in6addr_any;
1473  own_address_v6.sin6_scope_id= 0;
1474
1475  // create a new address-structure for the listening masterthread
1476  struct sockaddr_in own_address_v4;
1477  own_address_v4.sin_family = AF_INET;
1478  own_address_v4.sin_port = htons(tpparam.port); // use port number in param structure
1479  // accept incoming connections on all interfaces
1480  own_address_v4.sin_addr.s_addr = INADDR_ANY;
1481 
1482  // create a listening socket
1483  int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1484  if (master_listener_socket!=-1) v4_mode = false;
1485  if (master_listener_socket == -1) {
1486          master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
1487          if (master_listener_socket!=-1) v4_mode = true;
1488  }
1489  if (master_listener_socket == -1)
1490  {
1491    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1492    return;
1493  }
1494 
1495  // Disable Nagle Algorithm, set (TCP_NODELAY)
1496  int nodelayflag= 1;
1497  int status= setsockopt(master_listener_socket,
1498                         IPPROTO_TCP,
1499                         TCP_NODELAY,
1500                         &nodelayflag,
1501                         sizeof(nodelayflag));
1502  if (status)
1503  {
1504    Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1505  }
1506 
1507  // Reuse ports, even if they are busy
1508  int socketreuseflag= 1;
1509  status= setsockopt(master_listener_socket,
1510                           SOL_SOCKET,
1511                           SO_REUSEADDR,
1512                           (const char *) &socketreuseflag,
1513                           sizeof(socketreuseflag));
1514  if (status)
1515  {
1516       Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1517  }
1518 
1519 
1520  // bind the newly created socket to a specific address
1521  int bind_status = bind(master_listener_socket, v4_mode ?
1522                         reinterpret_cast<struct sockaddr *>(&own_address_v4) :
1523                         reinterpret_cast<struct sockaddr *>(&own_address_v6),
1524                                         v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
1525  if (bind_status)
1526    { 
1527      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " 
1528          << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
1529                          inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
1530          << " port " << tpparam.port << " failed, error: " << strerror(errno));
1531      return;
1532    }
1533
1534    // listen at the socket,
1535    // queuesize for pending connections= max_listen_queue_size
1536    int listen_status = listen(master_listener_socket, max_listen_queue_size);
1537    if (listen_status)
1538    {
1539      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1540          << " failed, error: " << strerror(errno));
1541      return;
1542    }
1543    else
1544    {
1545      Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1546    }
1547
1548    // activate O_NON_BLOCK for accept (accept does not block)
1549    fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1550
1551    // create a pollfd struct for use in the mainloop
1552    struct pollfd poll_fd;
1553    poll_fd.fd = master_listener_socket;
1554    poll_fd.events = POLLIN | POLLPRI; 
1555    poll_fd.revents = 0;
1556    /*
1557      #define POLLIN    0x001   // There is data to read.
1558      #define POLLPRI   0x002   // There is urgent data to read. 
1559      #define POLLOUT   0x004   // Writing now will not block. 
1560    */
1561   
1562    bool terminate = false;
1563    // check for thread terminate condition using get_state()
1564    state_t currstate= get_state();
1565    int poll_status= 0;
1566    const unsigned int number_poll_sockets= 1; 
1567    struct sockaddr_in6 peer_address;
1568    socklen_t peer_address_len;
1569    int conn_socket;
1570
1571    // check whether this thread is signaled for termination
1572    while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1573    {
1574      // wait on number_poll_sockets (main drm socket)
1575      // for the events specified above for sleep_time (in ms)
1576      poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1577      if (poll_fd.revents & POLLERR) // Error condition
1578      {
1579        if (errno != EINTR) 
1580        {
1581          Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1582              "Poll caused error " << strerror(errno) << " - indicated by revents");
1583        }
1584        else
1585        {
1586          Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1587        }
1588
1589      }
1590      if (poll_fd.revents & POLLHUP) // Hung up
1591      {
1592        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1593        return;
1594      }
1595      if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1596      {
1597        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1598        return;
1599      }
1600     
1601      switch (poll_status)
1602      {
1603        case -1:
1604          if (errno != EINTR)
1605          {
1606            Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1607          }
1608          else
1609          {
1610            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1611          }
1612           
1613          break;
1614
1615        case 0:
1616#ifdef DEBUG_HARD
1617          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, 
1618              "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1619#endif
1620          currstate= get_state();
1621          continue;
1622          break;
1623
1624        default:
1625#ifdef DEBUG_HARD
1626          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1627#endif
1628          break;
1629      } // end switch
1630
1631      // after a successful accept call,
1632      // accept stores the address information of the connecting party
1633      // in peer_address and the size of its address in addrlen
1634      peer_address_len= sizeof(peer_address);
1635      conn_socket = accept (master_listener_socket,
1636                            reinterpret_cast<struct sockaddr *>(&peer_address),
1637                            &peer_address_len);
1638      if (conn_socket == -1)
1639      {
1640        if (errno != EWOULDBLOCK && errno != EAGAIN)
1641        {
1642          Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1643              << " failed, error: " << strerror(errno));
1644          return;
1645        }
1646      }
1647      else
1648      {
1649        // create a new assocdata-object for the new thread
1650        AssocData* peer_assoc = NULL;
1651        appladdress addr(peer_address, IPPROTO_TCP);
1652
1653        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str() 
1654            << " port #" << addr.get_port());
1655
1656        struct sockaddr_in6 own_address;
1657        if (v4_mode) {
1658                struct sockaddr_in own_address_v4;
1659                socklen_t own_address_len_v4 = sizeof(own_address_v4);
1660                getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
1661                v4_to_v6(&own_address_v4, &own_address);
1662        } else {
1663                socklen_t own_address_len= sizeof(own_address);
1664                getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1665        }
1666
1667        // AssocData will copy addr content into its own structure
1668        // allocated peer_assoc will be stored in connmap
1669        peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1670
1671        bool insert_success= false;
1672        if (peer_assoc)
1673        {
1674          // start critical section
1675          lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1676          insert_success= connmap.insert(peer_assoc);
1677          // end critical section
1678          unlock(); // uninstall_cleanup(1);
1679        }
1680       
1681       
1682        if (insert_success == false) // not inserted into connmap
1683        {
1684          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1685              << ", " << addr.get_ip_str() << ", port #" 
1686              << addr.get_port() << " into connection map, aborting connection...");
1687
1688          // abort connection, delete its AssocData
1689          close (conn_socket);
1690          if (peer_assoc) 
1691          { 
1692            delete peer_assoc;
1693            peer_assoc= 0;
1694          }
1695          return;
1696               
1697        } //end __else(connmap.insert());__
1698       
1699        // create a new thread for each new connection
1700        create_new_receiver_thread(peer_assoc);
1701      } // end __else (connsocket)__
1702     
1703      // get new thread state
1704      currstate= get_state();
1705
1706    } // end while(!terminate)
1707    return;
1708} // end listen_for_connections()   
1709
1710
1711TPoverTCP::~TPoverTCP()
1712{
1713  init= false;
1714  this->connmap.clear();
1715
1716  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,  "Destructor called");
1717
1718  QueueManager::instance()->unregister_queue(tpparam.source);
1719}
1720
1721/** TPoverTCP Thread main loop.
1722 * This loop checks for internal messages of either
1723 * a send() call to start a new receiver thread, or,
1724 * of a receiver_thread() that signals its own termination
1725 * for proper cleanup of control structures.
1726 * It also handles the following internal TPoverTCPMsg types:
1727 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1728 * - TPoverTCPMsg::start - a particular receiver thread is started
1729 * @param nr number of current thread instance
1730 */
1731void 
1732TPoverTCP::main_loop(uint32 nr)
1733{
1734
1735  // get internal queue for messages from receiver_thread
1736  FastQueue* fq = get_fqueue();
1737  if (!fq) 
1738  {
1739    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1740    return;
1741  } // end if not fq
1742  // register queue for receiving internal messages from other modules
1743  QueueManager::instance()->register_queue(fq,tpparam.source);
1744
1745  // start master listener thread
1746  pthread_t master_listener_thread_ID;
1747  int pthread_status= pthread_create(&master_listener_thread_ID, 
1748                                     NULL, // NULL: default attributes: thread is joinable and has a
1749                                     //       default, non-realtime scheduling policy
1750                                     master_listener_thread_starter,
1751                                     this);
1752  if (pthread_status)
1753  {
1754    Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1755        "New master listener thread could not be created: " <<  strerror(pthread_status));
1756  }
1757  else
1758    Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1759
1760
1761  // define max latency for thread reaction on termination/stop signal
1762  timespec wait_interval= { 0, 250000000L }; // 250ms
1763  message* internal_thread_msg = NULL;
1764  state_t currstate= get_state();
1765
1766  // check whether this thread is signaled for termination
1767  while( currstate!=STATE_ABORT && currstate!=STATE_STOP ) 
1768  {
1769    // poll internal message queue (blocking)
1770    if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1771    {
1772      TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1773      if (internalmsg)
1774      {
1775        if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1776        {
1777          // a receiver thread terminated and signaled for cleanup by master thread
1778          AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1779          Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1780          lock();
1781          cleanup_receiver_thread( assocd );
1782          unlock();
1783        }
1784        else
1785        if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1786        {
1787          // start a new receiver thread
1788          create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1789        }
1790        else
1791          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1792         
1793        delete internalmsg;
1794      }
1795      else
1796      {
1797        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1798            << internal_thread_msg->get_source());
1799      }
1800    } // endif
1801
1802    // get thread state
1803    currstate= get_state();
1804  } // end while
1805
1806  if (currstate==STATE_STOP)
1807  {
1808    // start abort actions
1809    Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1810  } // end if stopped
1811
1812  // do not accept any more messages
1813  fq->shutdown();
1814  // terminate all receiver and sender threads that are still active
1815  terminate_all_threads();
1816}
1817
1818} // end namespace protlib
1819///@}
Note: See TracBrowser for help on using the repository browser.