An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.cpp @ 7040

Last change on this file since 7040 was 7040, checked in by mies, 14 years ago

fixed v4/v6 bug

File size: 55.6 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29 
30extern "C"
31{
32  //#define _SOCKADDR_LEN    /* use BSD 4.4 sockets */
33#include <unistd.h>      /* gethostname */
34#include <sys/types.h>   /* network socket interface */
35#include <netinet/in.h>  /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h>   /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63#define IPV6_ADDR_INT32_SMP 0x0000ffff
64
65namespace protlib {
66
67void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6)       {
68        bzero(sin, sizeof(*sin));
69        sin->sin_family = AF_INET;
70        sin->sin_port = sin6->sin6_port;
71        memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
72}
73
74/* Convert sockaddr_in to sockaddr_in6 in v4 mapped addr format. */
75void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
76        bzero(sin6, sizeof(*sin6));
77        sin6->sin6_family = AF_INET6;
78        sin6->sin6_port = sin->sin_port;
79        *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
80        *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
81        *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
82        *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
83}
84
85using namespace log;
86
87/** @defgroup tptcp TP over TCP
88 * @ingroup network
89 * @{
90 */
91
92char in6_addrstr[INET6_ADDRSTRLEN+1];
93
94
95/******* class TPoverTCP *******/
96
97
98/** get_connection_to() checks for already existing connections.
99 *  If a connection exists, it returns "AssocData"
100 *  and saves it in "connmap" for further use
101 *  If no connection exists, a new connection to "addr"
102 *  is created.
103 */
104AssocData* 
105TPoverTCP::get_connection_to(const appladdress& addr)
106{
107  // get timeout
108  struct timespec ts;
109  get_time_of_day(ts);
110  ts.tv_nsec+= tpparam.sleep_time * 1000000L;
111  if (ts.tv_nsec>=1000000000L)
112  {
113    ts.tv_sec += ts.tv_nsec / 1000000000L;
114    ts.tv_nsec= ts.tv_nsec % 1000000000L;
115  }
116
117  // create a new AssocData pointer, initialize it to NULL
118  AssocData* assoc= NULL;
119  int new_socket;
120  // loop until timeout is exceeded: TODO: check applicability of loop
121  do 
122  {
123    // check for existing connections to addr
124    // start critical section
125    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
126    assoc= connmap.lookup(addr);
127    // end critical section
128    unlock(); // uninstall_cleanup(1);
129    if (assoc) 
130    {
131      // If not shut down then use it, else abort, wait and check again.
132      if (!assoc->shutdown) 
133      {
134        return assoc;
135      }
136      else
137      {
138        // TODO: connection is already in shutdown mode. What now?
139        ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
140
141        return 0;
142      } 
143    } //end __if (assoc)__
144    else 
145    {
146      Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to " 
147          << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
148    }
149
150    // no connection found, create a new one
151    new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
152    if (new_socket == -1)
153    {
154      ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
155
156      return 0;
157    }
158
159    // Disable Nagle Algorithm, set (TCP_NODELAY)
160    int nodelayflag= 1;
161    int status= setsockopt(new_socket,
162                           IPPROTO_TCP,
163                           TCP_NODELAY,
164                           &nodelayflag,
165                           sizeof(nodelayflag));
166    if (status)
167    {
168        ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
169    }
170   
171    // Reuse ports, even if they are busy
172    int socketreuseflag= 1;
173    status= setsockopt(new_socket,
174                           SOL_SOCKET,
175                           SO_REUSEADDR,
176                           (const char *) &socketreuseflag,
177                           sizeof(socketreuseflag));
178    if (status)
179    {
180        ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
181    }
182
183        struct sockaddr_in6 dest_address;
184        dest_address.sin6_flowinfo= 0;
185        dest_address.sin6_scope_id= 0;
186        addr.get_sockaddr(dest_address);
187
188        // connect the socket to the destination address
189        int connect_status = 0;
190        if (v4_mode) {
191                struct sockaddr_in dest_address_v4;
192                v6_to_v4( &dest_address_v4, &dest_address );
193                connect_status = connect(new_socket,
194                                         reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
195                                         sizeof(dest_address));
196        } else {
197                connect_status = connect(new_socket,
198                                         reinterpret_cast<const struct sockaddr*>(&dest_address),
199                                         sizeof(dest_address));
200        }
201     
202    // connects to the listening_port of the peer's masterthread   
203    if (connect_status != 0)
204    {
205      ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port() 
206          << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
207     
208      return 0; // error: couldn't connect
209    }
210
211
212    struct sockaddr_in6 own_address;
213        if (v4_mode) {
214                struct sockaddr_in own_address_v4;
215                socklen_t own_address_len_v4 = sizeof(own_address_v4);
216                getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
217                v4_to_v6(&own_address_v4, &own_address);
218        } else {
219                socklen_t own_address_len= sizeof(own_address);
220                getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
221        }
222
223    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port() 
224        << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN) 
225        << " port #" << ntohs(own_address.sin6_port));
226
227   
228    // create new AssocData record (will copy addr)
229    assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
230
231    // if assoc could be successfully created, insert it into ConnectionMap
232    if (assoc) 
233    {
234      bool insert_success= false;
235      // start critical section
236      lock(); // install_cleanup_thread_lock(TPoverTCP, this);
237      // insert AssocData into connection map
238      insert_success= connmap.insert(assoc);
239      // end critical section
240      unlock(); // uninstall_cleanup(1);
241
242      if (insert_success == true) 
243      {
244#ifdef _DEBUG
245        Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
246            << " via socket " << new_socket);
247
248               
249#endif
250
251        // notify master thread to start a receiver thread (i.e. send selfmessage)
252        TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
253        if (newmsg)
254        {
255          newmsg->send_to(tpparam.source);
256          return assoc;
257        }
258        else
259          ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
260      } 
261      else 
262      {
263        // delete data and abort
264        ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
265            <<", port #" << addr.get_port() << " into connection map, aborting connection");
266               
267        // abort connection, delete its AssocData
268        close (new_socket);
269        if (assoc) 
270        { 
271          delete assoc; 
272          assoc= 0;
273        }
274        return assoc;
275      } // end else connmap.insert                     
276     
277    } // end "if (assoc)"
278  } 
279  while (wait_cond(ts)!=ETIMEDOUT);
280
281  return assoc;
282} //end get_connection_to
283
284
285/** terminates a signaling association/connection
286 *  - if no connection exists, generate a warning
287 *  - otherwise: generate internal msg to related receiver thread
288 */
289void
290TPoverTCP::terminate(const address& in_addr)
291{
292#ifndef _NO_LOGGING
293 const char *const thisproc="terminate()   - ";
294#endif
295
296 appladdress* addr = NULL;
297 addr = dynamic_cast<appladdress*>(in_addr.copy());
298
299 if (!addr) return;
300
301  // Create a new AssocData-pointer
302  AssocData* assoc = NULL;
303
304  // check for existing connections to addr
305
306  // start critical section:
307  // please note if receiver_thread terminated already, the assoc data is not
308  // available anymore therefore we need a lock around cleanup_receiver_thread()
309
310  lock(); // install_cleanup_thread_lock(TPoverTCP, this);
311  assoc= connmap.lookup(*addr);
312  if (assoc) 
313  {
314    EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
315    // If not shut down then use it, else abort, wait and check again.
316    if (!assoc->shutdown) 
317    {
318      if (assoc->socketfd)
319      {
320        // disallow sending
321        if (shutdown(assoc->socketfd,SHUT_WR))
322        {
323          ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
324        }
325        else
326        {
327          EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );       
328        }
329      }
330      assoc->shutdown= true;
331    }
332    else
333      EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
334     
335  }
336  else
337    WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
338
339  stop_receiver_thread(assoc);
340
341  // end critical section
342  unlock(); // uninstall_cleanup(1);
343
344  if (addr) delete addr;
345}
346
347
348/** generates and internal TPoverTCP message to send a NetMsg to the network
349 *  - it is necessary to let a thread do this, because the caller
350 *     may get blocked if the connect() or send() call hangs for a while
351 *  - the sending thread will call TPoverTCP::tcpsend()
352 *  - if no connection exists, creates a new one (unless use_existing_connection is true)
353 *  @note the netmsg is deleted by the send() method when it is not used anymore
354 */
355void
356TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
357{
358  if (netmsg == NULL) {
359    ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
360    return;
361  }
362
363  appladdress* addr = NULL;
364  addr= dynamic_cast<appladdress*>(in_addr.copy());
365   
366  if (!addr)
367  {
368    ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
369    return;
370  } 
371
372  // lock due to sendermap access
373  lock();
374   
375  // check for existing sender thread
376  sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
377   
378  FastQueue* destqueue= 0;
379
380  if (it == senderthread_queuemap.end())
381  { // no sender thread found so far
382
383    // if we already have an existing connection it is save to create a sender thread
384    // since get_connection_to() will not be invoked, so an existing connection will
385    // be used
386    const AssocData* assoc = connmap.lookup(*addr);
387
388    //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
389
390    if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
391    { // it is allowed to create a new connection for this thread
392      // create a new queue for sender thread
393      FastQueue* sender_thread_queue= new FastQueue;
394      create_new_sender_thread(sender_thread_queue);
395      // remember queue for later use
396     
397      //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
398      senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
399     
400      destqueue= sender_thread_queue;
401    }
402  }
403  else
404  { // we have a sender thread, use stored queue from map
405    destqueue= it->second; 
406  }
407   
408  unlock();
409   
410  // send a send_data message to it (if we have a destination queue)
411  if (destqueue)
412  {
413    // both parameters will be freed after message was sent!
414   
415    TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
416    if (internalmsg)
417    {
418      // send the internal message to the sender thread queue
419      internalmsg->send(tpparam.source,destqueue);
420    }
421  }
422  else
423  {
424    if (!use_existing_connection)
425      WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
426    else
427    {
428      DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
429
430    }
431    // cannot send data, so we must drop it
432    delete netmsg;
433  }
434 
435  if (addr) delete addr;
436}
437
438/** sends a NetMsg to the network.
439 *
440 * @param netmsg message to send
441 * @param addr   transport endpoint address
442 *
443 * @note   if no connection exists, creates a new one
444 * @note   both parameters are deleted after the message was sent
445 */
446void
447TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
448{                       
449#ifndef _NO_LOGGING
450  const char *const thisproc="sender   - ";
451#endif
452
453  // set result initially to success, set it to failure
454  // in places where these failures occur
455  int result = TCP_SUCCESS;
456  int saved_errno= 0;
457  int ret= 0;
458
459  // Create a new AssocData-pointer
460  AssocData* assoc = NULL;
461 
462  // tp.cpp checks for initialisation of tp and correctness of
463  // msgsize, protocol and ip,
464  // throws an error if something is not right
465  if (addr) {
466    addr->convert_to_ipv6();
467    check_send_args(*netmsg,*addr);
468  } 
469  else 
470  {
471    ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
472    result= TCP_SEND_FAILURE;
473
474    delete netmsg;
475    delete addr;
476
477    throw TPErrorInternal();
478  }
479
480
481  // check for existing connections,
482  // if a connection exists, return its AssocData
483  // and save it in assoc for further use
484  // if no connection exists, create a new one (in get_connection_to()).
485  // Connection is inserted into connection map that must be done
486  // with exclusive access
487  assoc= get_connection_to(*addr);
488
489  if (assoc==NULL || assoc->socketfd<=0)
490  {
491    ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
492
493    delete netmsg;
494    delete addr;
495    return;
496  }
497
498  if (assoc->shutdown)
499  {
500    Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
501    delete netmsg;
502    delete addr;
503
504    throw TPErrorSendFailed();
505  }
506
507  uint32 msgsize= netmsg->get_size();
508#ifdef DEBUG_HARD
509  cerr << thisproc << "message size=" << netmsg->get_size() << endl;
510#endif
511
512  const uint32 retry_send_max = 3;
513  uint32 retry_count = 0;
514  // send all the data contained in netmsg to the socket
515  // which belongs to the address "addr"
516  for (uint32 bytes_sent= 0;
517       bytes_sent < msgsize;
518       bytes_sent+= ret)
519  {
520
521#ifdef _DEBUG_HARD
522    for (uint32 i=0;i<msgsize;i++)
523    {
524      cout << "send_buf: " << i << " : "; 
525      if ( isalnum(*(netmsg->get_buffer()+i)) )
526        cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
527      else
528        cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
529      cout << endl;
530    }
531
532    cout << endl;
533    cout << "bytes_sent: " << bytes_sent << endl;
534    cout << "Message size: " <<  msgsize << endl;
535    cout << "Send-Socket: " << assoc->socketfd << endl;
536    cout << "pointer-Offset. " << netmsg->get_pos() << endl;
537    cout << "vor send " << endl;
538#endif
539
540    retry_count= 0;
541    do 
542    {
543      // socket send
544      ret= ::send(assoc->socketfd, 
545                  netmsg->get_buffer() + bytes_sent, 
546                  msgsize - bytes_sent, 
547                  MSG_NOSIGNAL);
548     
549      // send_buf + bytes_sent
550
551      // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
552      // retry sending
553      if (ret < 0)
554      { // internal error during send occured
555        saved_errno= errno;
556        switch(saved_errno)
557        {
558          case EAGAIN:  // same as EWOULDBLOCK
559          case EINTR:   // man page says: A signal occurred before any data was transmitted.
560          case ENOBUFS: //  The output queue for a network interface was full.
561            retry_count++;
562            ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
563                   << " - retry sending, retry #" << retry_count);
564            // pace down a little bit, sleep for a while
565            sleep(1);
566            break;
567
568            // everything else should not lead to repetition
569          default:
570            retry_count= retry_send_max;
571            break;
572        } // end switch
573      } // endif
574      else // leave while
575        break;
576    } 
577    while(retry_count < retry_send_max);
578
579    if (ret < 0) 
580    { // unrecoverable error occured
581     
582      result= TCP_SEND_FAILURE;
583      break;
584    } // end if (ret < 0)
585    else
586    {
587      if (debug_pdu)
588      {
589        ostringstream hexdump;
590        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
591        DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
592      }
593    }
594
595    // continue with send next data block in next for() iteration
596  } // end for
597
598  // *** note: netmsg is deleted here ***
599  delete netmsg;
600 
601  // Throwing an exception within a critical section does not
602  // unlock the mutex.
603
604  if (result != TCP_SUCCESS)
605  {
606    ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
607    delete addr;
608
609    throw TPErrorSendFailed(saved_errno);
610   
611  }
612  else
613    EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd  << " to " << *addr);
614
615  if (!assoc) {
616    // no connection
617   
618    ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str() 
619        << ", port #" << addr->get_port());
620
621    delete addr;
622
623    throw TPErrorUnreachable(); // should be no assoc found
624  } // end "if (!assoc)"
625 
626  // *** delete address ***
627  delete addr;
628}
629 
630/* this thread waits for an internal message that either:
631 * - requests transmission of a packet
632 * - requests to stop this thread
633 * @param argp points to the thread queue for internal messages
634 */
635void 
636TPoverTCP::sender_thread(void *argp)
637{
638#ifndef _NO_LOGGING
639  const char *const methodname="senderthread - ";
640#endif
641
642  message* internal_thread_msg = NULL;
643
644  EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
645
646  FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
647  if (!fq)
648  {
649    ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
650    return;
651  }
652
653  bool terminate= false;
654  TPoverTCPMsg* internalmsg= 0;
655  while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
656  {
657    internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
658   
659    if (internalmsg == 0)
660    {
661      ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());     
662    }
663    else
664    if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
665    {
666      // create a connection if none exists and send the netmsg
667      if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
668      {
669        try 
670        {
671          tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
672        } // end try
673        catch(TPErrorSendFailed& err)
674        {
675          ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what() 
676                 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
677        } // end catch
678        catch(TPError& err)
679        {
680          ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
681        } // end catch
682        catch(...)
683        {
684          ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
685        }
686      }
687      else
688      {
689        ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
690      } 
691    }
692    else
693    if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
694    {
695      terminate= true;
696    } 
697  } // end while
698 
699  EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
700}
701
702
703/** receiver thread listens at a TCP socket for incoming PDUs
704 *  and passes complete PDUs to the coordinator. Incomplete
705 *  PDUs due to aborted connections or buffer overflows are discarded.
706 *  @param argp - assoc data and flags sig_terminate and terminated
707 * 
708 *  @note this is a static member function, so you cannot use class variables
709 */
710void 
711TPoverTCP::receiver_thread(void *argp)
712{
713#ifndef _NO_LOGGING
714  const char *const methodname="receiver - ";
715#endif
716
717  receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
718  const appladdress* peer_addr = NULL;
719  const appladdress* own_addr = NULL;
720  uint32 bytes_received = 0;
721  TPMsg* tpmsg= NULL;
722 
723  // argument parsing - start
724  if (receiver_thread_argp == 0)
725  {
726    ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
727
728    return;
729  }
730  else
731  {
732    // change status to running, i.e., not terminated
733    receiver_thread_argp->terminated= false;
734
735#ifdef _DEBUG
736    DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
737#endif
738  }
739
740  int conn_socket= 0;
741  if (receiver_thread_argp->peer_assoc)
742  {
743    // get socket descriptor from arg
744    conn_socket = receiver_thread_argp->peer_assoc->socketfd;
745    // get pointer to peer address of socket (source/sender address of peer) from arg
746    peer_addr= &receiver_thread_argp->peer_assoc->peer;
747    own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
748  }
749  else
750  {
751    ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
752   
753    return;
754  }
755
756  if (peer_addr == 0)
757  {
758    ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
759   
760    return;
761  }
762  // argument parsing - end
763#ifdef _DEBUG
764  Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
765      "Preparing to wait for data at socket " 
766      << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
767#endif
768
769  int ret= 0;
770  uint32 msgcontentlength= 0;
771  bool msgcontentlength_known= false;
772  bool pdu_complete= false; // when to terminate inner loop
773
774  /* maybe use this to create a new pdu,
775    /// constructor
776    contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
777  */ 
778
779  // activate O_NON_BLOCK  for recv on socket conn_socket
780  // CAVEAT: this also implies non-blocking send()!
781  //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
782   
783  // set options for polling
784  const unsigned int number_poll_sockets= 1; 
785  struct pollfd poll_fd;
786  // have to set structure before poll call
787  poll_fd.fd = conn_socket;
788  poll_fd.events = POLLIN | POLLPRI; 
789  poll_fd.revents = 0;
790
791  int poll_status;
792  bool recv_error= false;
793
794  NetMsg* netmsg= 0;
795  NetMsg* remainbuf= 0;
796  size_t buffer_bytes_left= 0;
797  size_t trailingbytes= 0;
798  bool skiprecv= false;
799  // loop until we receive a terminate signal (read-only var for this thread)
800  // or get an error from socket read
801  while( receiver_thread_argp->sig_terminate == false )
802  {
803    // Read next PDU from socket or process trailing bytes in remainbuf
804    ret= 0;
805    msgcontentlength= 0;
806    msgcontentlength_known= false;
807    pdu_complete= false;
808    netmsg= 0;
809
810    // there are trailing bytes left from the last receive call
811    if (remainbuf != 0)
812    {
813      netmsg= remainbuf;
814      remainbuf= 0;
815      buffer_bytes_left= netmsg->get_size()-trailingbytes;
816      bytes_received= trailingbytes;
817      trailingbytes= 0;
818      skiprecv= true;
819    }
820    else // no trailing bytes, create a new buffer
821    if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
822    {
823      buffer_bytes_left= netmsg->get_size();
824      bytes_received= 0;
825      skiprecv= false;
826    }
827    else
828    { // buffer allocation failed
829      bytes_received= 0;
830      buffer_bytes_left= 0;
831      recv_error= true;
832    }
833   
834    // loops until PDU is complete
835    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
836    while (!pdu_complete && 
837           !recv_error && 
838           !receiver_thread_argp->sig_terminate)
839    {
840      if (!skiprecv)
841      {
842        // read from TCP socket or return after sleep_time
843        poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
844       
845        if (receiver_thread_argp->sig_terminate)
846        {
847          Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
848          // disallow sending
849          AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
850          if (myassoc->shutdown == false)
851          {
852            myassoc->shutdown= true;
853            if (shutdown(myassoc->socketfd,SHUT_WR))
854            {
855              if ( errno != ENOTCONN )
856                Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
857            }
858          }
859          // try to read do a last read from the TCP socket or return after sleep_time
860          if (poll_status == 0)
861          {
862            poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
863          }
864        }
865
866        if (poll_fd.revents & POLLERR) // Error condition
867        {
868          if (errno == 0 || errno == EINTR)
869          {
870            EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
871          }
872          else
873          {
874            ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
875            recv_error= true;
876          }
877        }
878       
879        if (poll_fd.revents & POLLHUP) // Hung up
880        {
881          Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
882          recv_error= true;
883        }
884       
885        if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
886        {
887          EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
888          recv_error= true;
889        }
890       
891        // check status (return value) of poll call
892        switch (poll_status)
893        {
894          case -1:
895            if (errno == 0 || errno == EINTR)
896            {
897              EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
898            }
899            else
900            {
901              ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
902              recv_error= true;
903            }
904           
905            continue; // next while iteration
906            break;
907           
908          case 0:
909#ifdef DEBUG_HARD
910            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
911#endif
912            continue; // next while iteration
913            break;
914           
915          default:
916#ifdef DEBUG_HARD
917            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
918#endif
919            break;
920        } // end switch
921
922
923        /// receive data from socket buffer (recv will not block)
924        ret = recv(conn_socket, 
925                   netmsg->get_buffer() + bytes_received, 
926                   buffer_bytes_left, 
927                   MSG_DONTWAIT);
928
929        if ( ret < 0 )
930        {
931          if (errno!=EAGAIN && errno!=EWOULDBLOCK)
932          {
933            ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
934            recv_error= true;
935            continue;
936          }
937          else
938          { // errno==EAGAIN || errno==EWOULDBLOCK
939            // just nothing to read from socket, continue w/ next poll
940            continue;
941          }
942        }
943        else
944        {
945          if (ret == 0)
946          {
947            // this means that EOF is reached,
948            // other side has closed connection
949            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
950            // disallow sending
951            AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
952            if (myassoc->shutdown == false)
953            {
954              myassoc->shutdown= true;
955              if (shutdown(myassoc->socketfd,SHUT_WR))
956              {
957                if ( errno != ENOTCONN )
958                  Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
959              }
960            }
961            // not a real error, but we must quit the receive loop
962            recv_error= true;
963          }
964          else
965          {
966
967            Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
968            // track number of received bytes
969            bytes_received+= ret;
970            buffer_bytes_left-= ret;
971          }
972        }
973      } // end if do not skip recv() statement     
974
975      if (buffer_bytes_left < 0) ///< buffer space exhausted now
976      {
977        recv_error= true;
978        Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
979      }
980
981      if (!msgcontentlength_known) ///< common header not parsed
982      {
983        // enough bytes read to parse common header?
984        if (bytes_received >= common_header_length)
985        {
986          // get message content length in number of bytes
987          if (getmsglength(*netmsg, msgcontentlength))
988            msgcontentlength_known= true;
989          else
990          {
991            ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
992
993            ostringstream hexdumpstr;
994            netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
995            DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
996
997            // reset all counters
998            msgcontentlength= 0;
999            msgcontentlength_known= false;
1000            bytes_received= 0;
1001            pdu_complete= false;
1002            continue;
1003          }
1004        }
1005      } // endif common header not parsed
1006
1007      // check whether we have read the whole Protocol PDU
1008      DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
1009      if (msgcontentlength_known)
1010      {
1011        if (bytes_received-common_header_length >= msgcontentlength )
1012        {
1013          pdu_complete= true;
1014          // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1015          netmsg->truncate(common_header_length+msgcontentlength);
1016
1017          // trailing bytes are copied into new buffer
1018          if (bytes_received-common_header_length > msgcontentlength)
1019          {
1020            WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1021            remainbuf= new NetMsg(NetMsg::max_size);
1022            trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1023            bytes_received= common_header_length+msgcontentlength;
1024            memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1025          }
1026        }
1027        else
1028        { // not enough bytes in the buffer must continue to read from network
1029          skiprecv= false;
1030        }
1031      } // endif msgcontentlength_known
1032    } // end while (!pdu_complete && !recv_error && !signalled for termination)
1033    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1034
1035    // if other side closed the connection, we should still be able to deliver the remaining data
1036    if (ret == 0)
1037    {
1038      recv_error= false;
1039    }
1040
1041    // deliver only complete PDUs to signaling module
1042    if (!recv_error && pdu_complete)
1043    {
1044      // create TPMsg and send it to the signaling thread
1045      tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1046      if (tpmsg)
1047      {
1048        DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1049             << " to module " <<  message::get_qaddr_name(tpparam.dest));
1050      }
1051
1052      debug_pdu=false;
1053
1054      if (debug_pdu)
1055      {
1056        ostringstream hexdump;
1057        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1058        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1059      }
1060
1061      // send the message if it was successfully created
1062      // bool message::send_to(qaddr_t dest, bool exp = false);
1063      if (!tpmsg
1064          || (!tpmsg->get_peeraddress())
1065          || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest))) 
1066      {
1067        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1068        if (tpmsg) delete tpmsg;
1069      } // end if tpmsg not allocated or not addr or not sent
1070     
1071
1072    } // end if !recv_error
1073    else
1074    { // error during receive or PDU incomplete
1075      if (bytes_received>0)
1076      {
1077        Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ?  "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1078      }
1079
1080      if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1081      {
1082        ostringstream hexdumpstr;
1083        netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1084        Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str()); 
1085      }
1086      // leave the outer loop
1087      /**********************/
1088      break;
1089      /**********************/
1090    } // end else
1091
1092  } // end while (thread not signalled for termination)
1093
1094  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() 
1095      << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1096
1097  // shutdown socket
1098  if (shutdown(conn_socket, SHUT_RD))
1099  {
1100    if ( errno != ENOTCONN )
1101      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1102  }
1103
1104  // close socket
1105  close(conn_socket);
1106
1107  receiver_thread_argp->terminated= true;
1108
1109  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1110
1111#ifdef _DEBUG
1112  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1113#endif
1114  // notify master thread for invocation of cleanup procedure
1115  TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1116  // send message to main loop thread
1117  newmsg->send_to(tpparam.source);
1118
1119} 
1120
1121
1122/** this signals a terminate to a thread and wait for the thread to stop
1123 *  @note it is not safe to access any thread related data after this method returned,
1124 *        because the receiver thread will initiate a cleanup_receiver_thread() method
1125 *        which may erase all relevant thread data.
1126 */
1127void 
1128TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1129{
1130  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1131  // after this procedure peer_assoc may be invalid because it was erased
1132
1133  // start critical section
1134
1135  if (peer_assoc == 0)
1136    return;
1137
1138  pthread_t thread_id=  peer_assoc->thread_ID;
1139 
1140  // try to clean up receiver_thread_arg
1141  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1142  receiver_thread_arg_t* recv_thread_argp= 
1143    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1144  if (recv_thread_argp)
1145  {
1146    if (!recv_thread_argp->terminated)
1147    {
1148      // thread signaled termination, but is not?
1149      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1150
1151      // signal thread for its termination
1152      recv_thread_argp->sig_terminate= true;
1153      // wait for thread to join after termination
1154      pthread_join(thread_id, 0);
1155      // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1156      return;
1157    }
1158  }
1159  else
1160    Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1161
1162}
1163
1164
1165/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1166 *  usually called by the master_thread after the receiver_thread terminated
1167 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1168 *       is still valid
1169 */
1170void 
1171TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1172{
1173  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1174  // after this procedure peer_assoc may be invalid because it was erased
1175
1176  // start critical section
1177
1178  if (peer_assoc == 0)
1179    return;
1180
1181  pthread_t thread_id=  peer_assoc->thread_ID;
1182 
1183  // try to clean up receiver_thread_arg
1184  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1185  receiver_thread_arg_t* recv_thread_argp= 
1186    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1187  if (recv_thread_argp)
1188  {
1189    if (!recv_thread_argp->terminated)
1190    {
1191      // thread signaled termination, but is not?
1192      Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1193      return;
1194    }
1195    else
1196    { // if thread is already terminated
1197      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1198
1199      // delete it from receiver map
1200      recv_thread_argmap.erase(recv_thread_arg_iter);
1201
1202      // then delete receiver arg structure
1203      delete recv_thread_argp;
1204    }
1205  }
1206
1207  // delete entry from connection map
1208
1209  // cleanup sender thread
1210  // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1211  terminate_sender_thread(peer_assoc);
1212
1213  // delete the AssocData structure from the connection map
1214  // also frees allocated AssocData structure
1215  connmap.erase(peer_assoc);
1216
1217  // end critical section
1218
1219  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1220}
1221
1222
1223/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1224 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1225 *       is still valid, a lock is also required, because senderthread_queuemap is changed
1226 */
1227void 
1228TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1229{
1230  if (assoc == 0)
1231  {
1232    Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1233    return;
1234  }
1235
1236  sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1237
1238  if (it != senderthread_queuemap.end())
1239  { // we have a sender thread: send a stop message to it
1240    FastQueue* destqueue= it->second; 
1241    if (destqueue)
1242    {
1243      TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1244      if (internalmsg)
1245      {
1246        // send the internal message to the sender thread queue
1247        internalmsg->send(tpparam.source,destqueue);
1248      }
1249    }
1250    else
1251    {
1252      Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1253    }
1254    // erase entry from map
1255    senderthread_queuemap.erase(it);
1256  }
1257}
1258
1259/* terminate all active threads
1260 * note: locking should not be necessary here because this message is called as last method from
1261 * main_loop()
1262 */
1263void 
1264TPoverTCP::terminate_all_threads()
1265{
1266  AssocData* assoc= 0;
1267  receiver_thread_arg_t* terminate_argp;
1268
1269  for (recv_thread_argmap_t::iterator terminate_iterator=  recv_thread_argmap.begin();
1270       terminate_iterator !=  recv_thread_argmap.end();
1271       terminate_iterator++)
1272  {
1273    if ( (terminate_argp= terminate_iterator->second) != 0)
1274    {
1275      // we need a non const pointer to erase it later on
1276      assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1277      // check whether thread is still alive
1278      if (terminate_argp->terminated == false)
1279      {
1280        terminate_argp->sig_terminate= true;
1281        // then wait for its termination
1282        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1283            "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1284       
1285        pthread_join(terminate_iterator->first, 0);
1286       
1287        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first  << "> is terminated");
1288      }
1289      else
1290        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1291            "Receiver thread <" << terminate_iterator->first << "> already terminated");
1292       
1293      // cleanup all remaining argument structures of terminated threads
1294      delete terminate_argp;
1295
1296      // terminate any related sender thread that is still running
1297      terminate_sender_thread(assoc);
1298     
1299      connmap.erase(assoc);
1300      // delete assoc is not necessary, because connmap.erase() will do the job
1301    }
1302  } // end for
1303}
1304
1305
1306/**
1307 * sender thread starter:
1308 * just a static starter method to allow starting the
1309 * actual sender_thread() method.
1310 *
1311 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1312 */
1313void*
1314TPoverTCP::sender_thread_starter(void *argp)
1315{
1316  sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1317 
1318  //cout << "invoked sender_thread_Starter" << endl;
1319
1320  // invoke sender thread method
1321  if (sargp != 0 && sargp->instance != 0)
1322  {
1323    // call receiver_thread member function on object instance
1324    sargp->instance->sender_thread(sargp->sender_thread_queue);
1325
1326    //cout << "Before deletion of sarg" << endl;
1327
1328    // no longer needed
1329    delete sargp;
1330  }
1331  else
1332  {
1333    Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1334  }
1335  return 0;
1336}
1337
1338
1339
1340
1341/**
1342 * receiver thread starter:
1343 * just a static starter method to allow starting the
1344 * actual receiver_thread() method.
1345 *
1346 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1347 */
1348void*
1349TPoverTCP::receiver_thread_starter(void *argp)
1350{
1351  receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1352  // invoke receiver thread method
1353  if (rargp != 0 && rargp->instance != 0)
1354  {
1355    // call receiver_thread member function on object instance
1356    rargp->instance->receiver_thread(rargp->rtargp);
1357
1358    // no longer needed
1359    delete rargp;
1360  }
1361  else
1362  {
1363    Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1364  }
1365  return 0;
1366}
1367
1368
1369void
1370TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1371{
1372  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1373
1374  pthread_t senderthreadid;
1375  // create new thread; (arg == 0) is handled by thread, too
1376  int pthread_status= pthread_create(&senderthreadid, 
1377                                     NULL, // NULL: default attributes: thread is joinable and has a
1378                                     //       default, non-realtime scheduling policy
1379                                     TPoverTCP::sender_thread_starter,
1380                                     new sender_thread_start_arg_t(this,senderfqueue));
1381  if (pthread_status)
1382  {
1383    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1384   
1385    delete senderfqueue;
1386  }
1387}
1388
1389
1390void
1391TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1392{
1393  receiver_thread_arg_t* argp= 
1394    new(nothrow) receiver_thread_arg(peer_assoc);
1395 
1396  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1397
1398  // create new thread; (arg == 0) is handled by thread, too
1399  int pthread_status= pthread_create(&peer_assoc->thread_ID, 
1400                                     NULL, // NULL: default attributes: thread is joinable and has a
1401                                     //       default, non-realtime scheduling policy
1402                                     receiver_thread_starter,
1403                                     new(nothrow) receiver_thread_start_arg_t(this,argp));
1404  if (pthread_status)
1405  {
1406    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1407   
1408    delete argp;
1409  }
1410  else
1411  {
1412    lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1413
1414    // remember pointer to thread arg structure
1415    // thread arg structure should be destroyed after thread termination only
1416    pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1417      recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1418    if (tmpinsiterator.second == false)
1419    {
1420      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1421    }
1422    unlock(); // uninstall_cleanup(1);
1423  } 
1424}
1425
1426
1427/**
1428 * master listener thread starter:
1429 * just a static starter method to allow starting the
1430 * actual master_listener_thread() method.
1431 *
1432 * @param argp - pointer to the current TPoverTCP object instance
1433 */
1434void*
1435TPoverTCP::master_listener_thread_starter(void *argp)
1436{
1437  // invoke listener thread method
1438  if (argp != 0)
1439  {
1440    (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1441  }
1442  return 0;
1443}
1444
1445
1446
1447/**
1448 * master listener thread: waits for incoming connections at the well-known tcp port
1449 * when a connection request is received this thread spawns a receiver_thread for
1450 * receiving packets from the peer at the new socket.
1451 */
1452void
1453TPoverTCP::master_listener_thread()
1454{
1455  // create a new address-structure for the listening masterthread
1456  struct sockaddr_in6 own_address_v6;
1457  own_address_v6.sin6_family = AF_INET6;
1458  own_address_v6.sin6_flowinfo= 0;
1459  own_address_v6.sin6_port = htons(tpparam.port); // use port number in param structure
1460  // accept incoming connections on all interfaces
1461  own_address_v6.sin6_addr = in6addr_any;
1462  own_address_v6.sin6_scope_id= 0;
1463
1464  // create a new address-structure for the listening masterthread
1465  struct sockaddr_in own_address_v4;
1466  own_address_v4.sin_family = AF_INET;
1467  own_address_v4.sin_port = htons(tpparam.port); // use port number in param structure
1468  // accept incoming connections on all interfaces
1469  own_address_v4.sin_addr.s_addr = INADDR_ANY;
1470 
1471  // create a listening socket
1472  int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1473  if (master_listener_socket!=-1) v4_mode = false;
1474  if (master_listener_socket == -1) {
1475          master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
1476          if (master_listener_socket!=-1) v4_mode = true;
1477  }
1478  if (master_listener_socket == -1)
1479  {
1480    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1481    return;
1482  }
1483 
1484  // Disable Nagle Algorithm, set (TCP_NODELAY)
1485  int nodelayflag= 1;
1486  int status= setsockopt(master_listener_socket,
1487                         IPPROTO_TCP,
1488                         TCP_NODELAY,
1489                         &nodelayflag,
1490                         sizeof(nodelayflag));
1491  if (status)
1492  {
1493    Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1494  }
1495 
1496  // Reuse ports, even if they are busy
1497  int socketreuseflag= 1;
1498  status= setsockopt(master_listener_socket,
1499                           SOL_SOCKET,
1500                           SO_REUSEADDR,
1501                           (const char *) &socketreuseflag,
1502                           sizeof(socketreuseflag));
1503  if (status)
1504  {
1505       Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1506  }
1507 
1508 
1509  // bind the newly created socket to a specific address
1510  int bind_status = bind(master_listener_socket, v4_mode ?
1511                         reinterpret_cast<struct sockaddr *>(&own_address_v4) :
1512                         reinterpret_cast<struct sockaddr *>(&own_address_v6),
1513                                         v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
1514  if (bind_status)
1515    { 
1516      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " 
1517          << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
1518                          inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
1519          << " port " << tpparam.port << " failed, error: " << strerror(errno));
1520      return;
1521    }
1522
1523    // listen at the socket,
1524    // queuesize for pending connections= max_listen_queue_size
1525    int listen_status = listen(master_listener_socket, max_listen_queue_size);
1526    if (listen_status)
1527    {
1528      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1529          << " failed, error: " << strerror(errno));
1530      return;
1531    }
1532    else
1533    {
1534      Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1535    }
1536
1537    // activate O_NON_BLOCK for accept (accept does not block)
1538    fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1539
1540    // create a pollfd struct for use in the mainloop
1541    struct pollfd poll_fd;
1542    poll_fd.fd = master_listener_socket;
1543    poll_fd.events = POLLIN | POLLPRI; 
1544    poll_fd.revents = 0;
1545    /*
1546      #define POLLIN    0x001   // There is data to read.
1547      #define POLLPRI   0x002   // There is urgent data to read. 
1548      #define POLLOUT   0x004   // Writing now will not block. 
1549    */
1550   
1551    bool terminate = false;
1552    // check for thread terminate condition using get_state()
1553    state_t currstate= get_state();
1554    int poll_status= 0;
1555    const unsigned int number_poll_sockets= 1; 
1556    struct sockaddr_in6 peer_address;
1557    socklen_t peer_address_len;
1558    int conn_socket;
1559
1560    // check whether this thread is signaled for termination
1561    while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1562    {
1563      // wait on number_poll_sockets (main drm socket)
1564      // for the events specified above for sleep_time (in ms)
1565      poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1566      if (poll_fd.revents & POLLERR) // Error condition
1567      {
1568        if (errno != EINTR) 
1569        {
1570          Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1571              "Poll caused error " << strerror(errno) << " - indicated by revents");
1572        }
1573        else
1574        {
1575          Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1576        }
1577
1578      }
1579      if (poll_fd.revents & POLLHUP) // Hung up
1580      {
1581        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1582        return;
1583      }
1584      if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1585      {
1586        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1587        return;
1588      }
1589     
1590      switch (poll_status)
1591      {
1592        case -1:
1593          if (errno != EINTR)
1594          {
1595            Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1596          }
1597          else
1598          {
1599            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1600          }
1601           
1602          break;
1603
1604        case 0:
1605#ifdef DEBUG_HARD
1606          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, 
1607              "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1608#endif
1609          currstate= get_state();
1610          continue;
1611          break;
1612
1613        default:
1614#ifdef DEBUG_HARD
1615          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1616#endif
1617          break;
1618      } // end switch
1619
1620      // after a successful accept call,
1621      // accept stores the address information of the connecting party
1622      // in peer_address and the size of its address in addrlen
1623      peer_address_len= sizeof(peer_address);
1624      conn_socket = accept (master_listener_socket,
1625                            reinterpret_cast<struct sockaddr *>(&peer_address),
1626                            &peer_address_len);
1627      if (conn_socket == -1)
1628      {
1629        if (errno != EWOULDBLOCK && errno != EAGAIN)
1630        {
1631          Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1632              << " failed, error: " << strerror(errno));
1633          return;
1634        }
1635      }
1636      else
1637      {
1638        // create a new assocdata-object for the new thread
1639        AssocData* peer_assoc = NULL;
1640        appladdress addr(peer_address, IPPROTO_TCP);
1641
1642        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str() 
1643            << " port #" << addr.get_port());
1644
1645        struct sockaddr_in6 own_address;
1646        if (v4_mode) {
1647                struct sockaddr_in own_address_v4;
1648                socklen_t own_address_len_v4 = sizeof(own_address_v4);
1649                getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
1650                v4_to_v6(&own_address_v4, &own_address);
1651        } else {
1652                socklen_t own_address_len= sizeof(own_address);
1653                getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1654        }
1655
1656        // AssocData will copy addr content into its own structure
1657        // allocated peer_assoc will be stored in connmap
1658        peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1659
1660        bool insert_success= false;
1661        if (peer_assoc)
1662        {
1663          // start critical section
1664          lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1665          insert_success= connmap.insert(peer_assoc);
1666          // end critical section
1667          unlock(); // uninstall_cleanup(1);
1668        }
1669       
1670       
1671        if (insert_success == false) // not inserted into connmap
1672        {
1673          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1674              << ", " << addr.get_ip_str() << ", port #" 
1675              << addr.get_port() << " into connection map, aborting connection...");
1676
1677          // abort connection, delete its AssocData
1678          close (conn_socket);
1679          if (peer_assoc) 
1680          { 
1681            delete peer_assoc;
1682            peer_assoc= 0;
1683          }
1684          return;
1685               
1686        } //end __else(connmap.insert());__
1687       
1688        // create a new thread for each new connection
1689        create_new_receiver_thread(peer_assoc);
1690      } // end __else (connsocket)__
1691     
1692      // get new thread state
1693      currstate= get_state();
1694
1695    } // end while(!terminate)
1696    return;
1697} // end listen_for_connections()   
1698
1699
1700TPoverTCP::~TPoverTCP()
1701{
1702  init= false;
1703
1704  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,  "Destructor called");
1705
1706  QueueManager::instance()->unregister_queue(tpparam.source);
1707}
1708
1709/** TPoverTCP Thread main loop.
1710 * This loop checks for internal messages of either
1711 * a send() call to start a new receiver thread, or,
1712 * of a receiver_thread() that signals its own termination
1713 * for proper cleanup of control structures.
1714 * It also handles the following internal TPoverTCPMsg types:
1715 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1716 * - TPoverTCPMsg::start - a particular receiver thread is started
1717 * @param nr number of current thread instance
1718 */
1719void 
1720TPoverTCP::main_loop(uint32 nr)
1721{
1722
1723  // get internal queue for messages from receiver_thread
1724  FastQueue* fq = get_fqueue();
1725  if (!fq) 
1726  {
1727    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1728    return;
1729  } // end if not fq
1730  // register queue for receiving internal messages from other modules
1731  QueueManager::instance()->register_queue(fq,tpparam.source);
1732
1733  // start master listener thread
1734  pthread_t master_listener_thread_ID;
1735  int pthread_status= pthread_create(&master_listener_thread_ID, 
1736                                     NULL, // NULL: default attributes: thread is joinable and has a
1737                                     //       default, non-realtime scheduling policy
1738                                     master_listener_thread_starter,
1739                                     this);
1740  if (pthread_status)
1741  {
1742    Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1743        "New master listener thread could not be created: " <<  strerror(pthread_status));
1744  }
1745  else
1746    Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1747
1748
1749  // define max latency for thread reaction on termination/stop signal
1750  timespec wait_interval= { 0, 250000000L }; // 250ms
1751  message* internal_thread_msg = NULL;
1752  state_t currstate= get_state();
1753
1754  // check whether this thread is signaled for termination
1755  while( currstate!=STATE_ABORT && currstate!=STATE_STOP ) 
1756  {
1757    // poll internal message queue (blocking)
1758    if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1759    {
1760      TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1761      if (internalmsg)
1762      {
1763        if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1764        {
1765          // a receiver thread terminated and signaled for cleanup by master thread
1766          AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1767          Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1768          lock();
1769          cleanup_receiver_thread( assocd );
1770          unlock();
1771        }
1772        else
1773        if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1774        {
1775          // start a new receiver thread
1776          create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1777        }
1778        else
1779          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1780         
1781        delete internalmsg;
1782      }
1783      else
1784      {
1785        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1786            << internal_thread_msg->get_source());
1787      }
1788    } // endif
1789
1790    // get thread state
1791    currstate= get_state();
1792  } // end while
1793
1794  if (currstate==STATE_STOP)
1795  {
1796    // start abort actions
1797    Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1798  } // end if stopped
1799
1800  // do not accept any more messages
1801  fq->shutdown();
1802  // terminate all receiver and sender threads that are still active
1803  terminate_all_threads();
1804}
1805
1806} // end namespace protlib
1807///@}
Note: See TracBrowser for help on using the repository browser.