An Overlay-based
Virtual Network Substrate
SpoVNet

source: trash/old-modules/transport/protlib/tp_over_uds.cpp @ 5641

Last change on this file since 5641 was 5641, checked in by Christoph Mayer, 14 years ago
File size: 50.7 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_uds.cpp
3/// transport module for unix domain socket communication
4/// ----------------------------------------------------------
5/// $Id: tp_over_uds.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_uds.cpp $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29 
30extern "C"
31{
32  //#define _SOCKADDR_LEN    /* use BSD 4.4 sockets */
33#include <unistd.h>      /* gethostname */
34#include <sys/types.h>   /* network socket interface */
35#include <netinet/in.h>  /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h>   /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42#include <sys/un.h>
43}
44
45#include <iostream>
46#include <errno.h>
47#include <string>
48#include <sstream>
49
50#include "tp_over_uds.h"
51#include "threadsafe_db.h"
52#include "cleanuphandler.h"
53#include "setuid.h"
54#include "queuemanager.h"
55#include "logfile.h"
56
57#include <set>
58
59#define UDS_SUCCESS 0
60#define UDS_SEND_FAILURE 1
61
62const unsigned int max_listen_queue_size= 10;
63
64namespace protlib {
65
66using namespace log;
67
68/** @defgroup tpuds TP over UDS
69 * @ingroup network
70 * @{
71 */
72
73
74/******* class TPoverUDS *******/
75
76
77/** get_connection_to() checks for already existing connections.
78 *  If a connection exists, it returns "AssocData"
79 *  and saves it in "connmap" for further use
80 *  If no connection exists, a new connection to "addr"
81 *  is created.
82 */
83AssocDataUDS* 
84TPoverUDS::get_connection_to(udsaddress& addr)
85{
86  // get timeout
87  struct timespec ts;
88  get_time_of_day(ts);
89  ts.tv_nsec+= tpparam.sleep_time * 1000000L;
90  if (ts.tv_nsec>=1000000000L)
91  {
92    ts.tv_sec += ts.tv_nsec / 1000000000L;
93    ts.tv_nsec= ts.tv_nsec % 1000000000L;
94  }
95
96  // create a new AssocData pointer, initialize it to NULL
97  AssocDataUDS* assoc= NULL;
98  int new_socket;
99  // loop until timeout is exceeded: TODO: check applicability of loop
100  do 
101  {
102    // check for existing connections to addr
103    // start critical section
104    lock(); // install_cleanup_thread_lock(TPoverUDS, this);
105    assoc= connmap.lookup(addr);
106    // end critical section
107    unlock(); // uninstall_cleanup(1);
108    if (assoc) 
109    {
110      // If not shut down then use it, else abort, wait and check again.
111      if (!assoc->shutdown) 
112      {
113        return assoc;
114      }
115      else
116      {
117        // TODO: connection is already in shutdown mode. What now?
118        Log(ERROR_LOG,LOG_CRIT,tpparam.name,"socket exists, but is already in mode shutdown");
119
120        return 0;
121      } 
122    } //end __if (assoc)__
123    else 
124    {
125      Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to " 
126          << addr << " found, creating a new one.");
127    }
128
129    // no connection found, create a new one
130    new_socket = socket(AF_UNIX, SOCK_STREAM, 0);
131    if (new_socket == -1)
132    {
133      Log(ERROR_LOG,LOG_CRIT,tpparam.name,"Couldn't create a new socket: " << strerror(errno));
134
135      return 0;
136    }
137   
138    // Reuse ports, even if they are busy
139    int socketreuseflag= 1;
140    int status= setsockopt(new_socket,
141                           SOL_SOCKET,
142                           SO_REUSEADDR,
143                           (const char *) &socketreuseflag,
144                           sizeof(socketreuseflag));
145    if (status)
146    {
147        Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
148    }
149
150    struct sockaddr_un dest_address;
151    dest_address.sun_family= AF_UNIX;
152    strcpy(dest_address.sun_path, addr.get_udssocket().c_str());
153     
154    // connect the socket to the destination address
155    int connect_status = connect(new_socket,
156                                 reinterpret_cast<const struct sockaddr*>(&dest_address),
157                                 sizeof(dest_address));
158
159    // connects to the listening_port of the peer's masterthread   
160    if (connect_status != 0)
161    {
162        Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"Connect to " << addr.get_udssocket() << "failed: [" << color[red] << strerror(errno) << color[off] << "]");
163     
164      throw TPErrorConnectSetupFail();
165    }
166
167   
168    //struct sockaddr_un own_address;
169    //socklen_t own_address_len= sizeof(own_address);
170    //getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
171
172    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr << " from " << udsaddress(new_socket));
173   
174    // create new AssocData record (will copy addr)
175    assoc = new(nothrow) AssocDataUDS(new_socket, addr, udsaddress(new_socket));
176
177    // if assoc could be successfully created, insert it into ConnectionMap
178    if (assoc) 
179    {
180      bool insert_success= false;
181      // start critical section
182      lock(); // install_cleanup_thread_lock(TPoverUDS, this);
183      // insert AssocData into connection map
184      insert_success= connmap.insert(assoc);
185      // end critical section
186      unlock(); // uninstall_cleanup(1);
187
188      if (insert_success == true) 
189      {
190#ifdef _DEBUG
191        Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr << " via socket " << new_socket);
192
193               
194#endif
195
196        // notify master thread to start a receiver thread (i.e. send selfmessage)
197        TPoverUDSMsg* newmsg= new(nothrow)TPoverUDSMsg(assoc, tpparam.source, TPoverUDSMsg::start);
198        if (newmsg)
199        {
200          newmsg->send_to(tpparam.source);
201          return assoc;
202        }
203        else
204          Log(ERROR_LOG,LOG_CRIT,tpparam.name,"get_connection_to: could not get memory for internal msg");
205      } 
206      else 
207      {
208        // delete data and abort
209        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr << " into connection map, aborting connection");
210               
211        // abort connection, delete its AssocData
212        close (new_socket);
213        if (assoc) 
214        { 
215          delete assoc; 
216          assoc= 0;
217        }
218        return assoc;
219      } // end else connmap.insert                     
220     
221    } // end "if (assoc)"
222  } 
223  while (wait_cond(ts)!=ETIMEDOUT);
224
225  return assoc;
226} //end get_connection_to
227
228
229/** terminates a signaling association/connection
230 *  - if no connection exists, generate a warning
231 *  - otherwise: generate internal msg to related receiver thread
232 */
233void
234TPoverUDS::terminate(const address& in_addr)
235{
236
237    udsaddress* addr = NULL;
238    addr = dynamic_cast<udsaddress*>(in_addr.copy());
239
240#ifndef _NO_LOGGING
241  const char *const thisproc="terminate()   - ";
242#endif
243
244  // Create a new AssocData-pointer
245  AssocDataUDS* assoc = NULL;
246
247  // check for existing connections to addr
248
249  // start critical section:
250  // please note if receiver_thread terminated already, the assoc data is not
251  // available anymore therefore we need a lock around cleanup_receiver_thread()
252
253  lock(); // install_cleanup_thread_lock(TPoverUDS, this);
254  assoc= connmap.lookup(*addr);
255  if (assoc) 
256  {
257    Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
258    // If not shut down then use it, else abort, wait and check again.
259    if (!assoc->shutdown) 
260    {
261      if (assoc->socketfd)
262      {
263        // disallow sending
264        if (shutdown(assoc->socketfd,SHUT_WR))
265        {
266          Log(ERROR_LOG,LOG_UNIMP,tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
267        }
268        else
269        {
270          Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );     
271        }
272      }
273      assoc->shutdown= true;
274    }
275    else
276      Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
277     
278  }
279  else
280    Log(WARNING_LOG,LOG_NORMAL,tpparam.name,thisproc<<"could not find a connection for peer " << addr);
281
282  stop_receiver_thread(assoc);
283
284  // end critical section
285  unlock(); // uninstall_cleanup(1);
286
287  if (addr) delete addr;
288
289}
290
291
292/** generates and internal TPoverUDS message to send a NetMsg to the network
293 *
294 *  - it is necessary to let a thread do this, because the caller
295 *     may get blocked if the connect() or send() call hangs for a while
296 *  - the sending thread will call TPoverUDS::udssend()
297 *  - if no connection exists, creates a new one
298 *
299 *  @note the netmsg is deleted by the send() method when it is not used anymore
300 */
301void
302TPoverUDS::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
303{
304    if (netmsg == NULL) {
305      ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
306      return;
307    }
308
309    udsaddress* addr = NULL;
310    addr = dynamic_cast<udsaddress*>(in_addr.copy());
311
312    if (!addr)
313    {
314      ERRCLog(tpparam.name,"send() - given destination address is not of expected type (udsaddress), has type: " << (int) in_addr.get_type());
315      return;
316    }
317
318
319  // lock due to sendermap access
320  lock();
321
322
323  // check for existing sender thread
324  sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
325
326  FastQueue* destqueue= 0;
327
328  if (it == senderthread_queuemap.end())
329  { // no sender thread so far
330
331    if (!use_existing_connection)
332    { // it is allowed to create a new connection for this thread
333
334      // create a new queue for sender thread
335      FastQueue* sender_thread_queue= new FastQueue;
336      create_new_sender_thread(sender_thread_queue);
337      // remember queue for later use
338   
339      //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
340      senderthread_queuemap.insert( pair<udsaddress,FastQueue*> (*addr,sender_thread_queue) );
341
342      destqueue= sender_thread_queue;
343    }
344  }
345  else
346  { // we have a sender thread, use stored queue from map
347    destqueue= it->second; 
348  }
349
350  unlock();
351   
352  // send a send_data message to it (if we have a destination queue)
353  if (destqueue)
354  {
355    // both parameters will be freed after message was sent!
356   
357      //DLog(tpparam.name, "Sending self-message for socket " << addr->get_udssocket().c_str());
358
359      TPoverUDSMsg* internalmsg= new TPoverUDSMsg(netmsg,addr->copy());
360      if (internalmsg)
361      {
362          //DLog(tpparam.name, "Address in internal message: " << internalmsg->get_appladdr()->get_udssocket());
363          // send the internal message to the sender thread queue
364          internalmsg->send(tpparam.source,destqueue);
365      }
366  }
367  else
368  {
369    if (!use_existing_connection)
370      WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
371    else
372      DLog(tpparam.name,"no active sender thread found " << *addr << " - but policy forbids to set up a new connection, will drop data");
373     
374    // cannot send data, so we must drop it
375    delete netmsg;
376  }
377
378  if (addr) delete addr;
379}
380
381/** sends a NetMsg to the network.
382 *
383 * @param netmsg - message to send
384 * @param addr   - transport endpoint address
385 *
386 * @note if no connection exists, creates a new one
387 * @note both parameters are deleted after the message was sent
388 */
389void
390TPoverUDS::udssend(NetMsg* netmsg, udsaddress* addr)
391{                       
392#ifndef _NO_LOGGING
393  const char *const thisproc="sender   - ";
394#endif
395
396  // set result initially to success, set it to failure
397  // in places where these failures occur
398  int result = UDS_SUCCESS;
399  int ret= 0;
400
401  // Create a new AssocData-pointer
402  AssocDataUDS* assoc = NULL;
403 
404  // tp.cpp checks for initialisation of tp and correctness of
405  // msgsize, protocol and ip,
406  // throws an error if something is not right
407  if (addr) {
408      if (!(addr->get_udssocket().size() || addr->get_socknum())) {
409          ERRCLog(tpparam.name, "No UNIX Domain Socket Path / Socket Number in address, will not process it: " << addr->get_udssocket());
410          return;
411      }
412  } else {
413      Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "address pointer is NULL");
414      result= UDS_SEND_FAILURE;   
415      throw TPErrorInternal();
416  }
417   
418
419
420  // check for existing connections,
421  // if a connection exists, return its AssocData
422  // and save it in assoc for further use
423  // if no connection exists, create a new one (in get_connection_to()).
424  // Connection is inserted into connection map that must be done
425  // with exclusive access
426  assoc= get_connection_to(*addr);
427
428  if (assoc==NULL || assoc->socketfd<=0)
429  {
430    Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "no valid assoc/socket data");
431
432    delete netmsg;
433    delete addr;
434    return;
435  }
436  if (assoc->shutdown)
437  {
438    Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
439    delete netmsg;
440    delete addr;
441
442    throw TPErrorSendFailed();
443  }
444
445  uint32 msgsize= netmsg->get_size();
446#ifdef DEBUG_HARD
447  cerr << thisproc << "message size=" << netmsg->get_size() << endl;
448#endif
449
450  // send all the data contained in netmsg to the socket
451  // which belongs to the address "addr"
452  for (uint32 bytes_sent= 0;
453       bytes_sent < msgsize;
454       bytes_sent+= ret)
455  {
456
457#ifdef _DEBUG_HARD
458    for (uint32 i=0;i<msgsize;i++)
459    {
460      cout << "send_buf: " << i << " : "; 
461      if ( isalnum(*(netmsg->get_buffer()+i)) )
462        cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
463      else
464        cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
465      cout << endl;
466    }
467
468    cout << endl;
469    cout << "bytes_sent: " << bytes_sent << endl;
470    cout << "Message size: " <<  msgsize << endl;
471    cout << "Send-Socket: " << assoc->socketfd << endl;
472    cout << "pointer-Offset. " << netmsg->get_pos() << endl;
473    cout << "vor send " << endl;
474#endif
475
476    // socket send
477    ret= ::send(assoc->socketfd, 
478                netmsg->get_buffer() + bytes_sent, 
479                msgsize - bytes_sent, 
480                MSG_NOSIGNAL);
481
482    //send_buf + bytes_sent
483
484    if (debug_pdu)
485    {
486      ostringstream hexdump;
487      netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
488      Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
489    }
490
491    if (ret < 0) 
492    {
493      result= UDS_SEND_FAILURE;
494      break;
495    } // end if (ret < 0)
496  } // end for
497
498
499  // *** note: netmsg is deleted here ***
500  delete netmsg;
501 
502  // Throwing an exception within a critical section does not
503  // unlock the mutex.
504
505  if (result != UDS_SUCCESS)
506  {
507    Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "Unix Domain Socket error, returns " << ret << ", error : " << strerror(errno));
508    delete addr;
509
510    throw TPErrorSendFailed();
511   
512    // There is no special errorcode for sending failed
513    // in tp.h, there are only these:
514    /*
515      ERROR_BAD_ADDRESS,
516      ERROR_BAD_NETMSG,
517      ERROR_NOT_INIT,
518      ERROR_UNREACHABLE,
519      ERROR_INTERNAL,
520      ERROR_PAYLOAD
521    */
522  }
523  else
524    Log(EVENT_LOG,LOG_NORMAL,tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd  << " to " << *addr);
525
526  if (!assoc) {
527    // no connection
528   
529    Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "cannot get connection to " << *addr);
530
531    delete addr;
532
533    throw TPErrorUnreachable(); // should be no assoc found
534  } // end "if (!assoc)"
535 
536  // *** delete address ***
537  delete addr;
538}
539 
540/* this thread waits for an internal message that either:
541 * - requests transmission of a packet
542 * - requests to stop this thread
543 * @param argp points to the thread queue for internal messages
544 */
545void 
546TPoverUDS::sender_thread(void *argp)
547{
548#ifndef _NO_LOGGING
549  const char *const methodname="senderthread - ";
550#endif
551
552  message* internal_thread_msg = NULL;
553
554  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
555
556  FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
557  if (!fq)
558  {
559    Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
560    return;
561  }
562
563  bool terminate= false;
564  TPoverUDSMsg* internalmsg= 0;
565  while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
566  {
567    internalmsg= dynamic_cast<TPoverUDSMsg*>(internal_thread_msg);
568   
569    if (internalmsg == 0)
570    {
571      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "received not an TPoverUDSMsg but a" << internal_thread_msg->get_type_name());     
572    }
573    else
574    if (internalmsg->get_msgtype() == TPoverUDSMsg::send_data)
575    {
576     
577        //DLog(tpparam.name, "Got a send request for socket " << *(internalmsg->get_appladdr()));
578
579      // create a connection if none exists and send the netmsg
580      if (internalmsg->get_netmsg() && internalmsg->get_udsaddr())
581      {
582        udssend(internalmsg->get_netmsg(),internalmsg->get_udsaddr());
583      }
584      else
585      {
586        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "problem with passed arguments references, they point to 0");
587      } 
588    }
589    else
590    if (internalmsg->get_msgtype() == TPoverUDSMsg::stop)
591    {
592      terminate= true;
593    } 
594  } // end while
595 
596  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
597}
598
599
600/** receiver thread listens at a TCP socket for incoming PDUs
601 *  and passes complete PDUs to the coordinator. Incomplete
602 *  PDUs due to aborted connections or buffer overflows are discarded.
603 *  @param argp - assoc data and flags sig_terminate and terminated
604 * 
605 *  @note this is a static member function, so you cannot use class variables
606 */
607void 
608TPoverUDS::receiver_thread(void *argp)
609{
610#ifndef _NO_LOGGING
611  const char *const methodname="receiver - ";
612#endif
613  receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
614  const udsaddress* peer_addr = NULL;
615  const udsaddress* own_addr = NULL;
616  uint32 bytes_received = 0;
617  TPMsg* tpmsg= NULL;
618 
619  // argument parsing - start
620  if (receiver_thread_argp == 0)
621  {
622    Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
623
624    return;
625  }
626  else
627  {
628    // change status to running, i.e., not terminated
629    receiver_thread_argp->terminated= false;
630
631#ifdef _DEBUG
632    Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
633#endif
634  }
635
636  int conn_socket= 0;
637  if (receiver_thread_argp->peer_assoc)
638  {
639    // get socket descriptor from arg
640    conn_socket = receiver_thread_argp->peer_assoc->socketfd;
641    // get pointer to peer address of socket (source/sender address of peer) from arg
642    peer_addr= &receiver_thread_argp->peer_assoc->peer;
643    own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
644  }
645  else
646  {
647    Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer assoc available - pointer is NULL");
648   
649    return;
650  }
651
652  if (peer_addr == 0)
653  {
654    Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
655   
656    return;
657  }
658  // argument parsing - end
659#ifdef _DEBUG
660  Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
661      "Preparing to wait for data at socket " 
662      << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
663#endif
664
665  int ret= 0;
666  uint32 msgcontentlength= 0;
667  bool msgcontentlength_known= false;
668  bool pdu_complete= false; // when to terminate inner loop
669
670  /* maybe use this to create a new pdu,
671    /// constructor
672    contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
673  */ 
674
675  // activate O_NON_BLOCK  for recv on socket conn_socket
676  fcntl(conn_socket,F_SETFL, O_NONBLOCK);
677   
678  // set options for polling
679  const unsigned int number_poll_sockets= 1; 
680  struct pollfd poll_fd;
681  // have to set structure before poll call
682  poll_fd.fd = conn_socket;
683  poll_fd.events = POLLIN | POLLPRI; 
684
685  int poll_status;
686  bool recv_error= false;
687
688  NetMsg* netmsg= 0;
689  NetMsg* remainbuf= 0;
690  size_t buffer_bytes_left= 0;
691  size_t trailingbytes= 0;
692  bool skiprecv= false;
693  // loop until we receive a terminate signal (read-only var for this thread)
694  // or get an error from socket read
695  while( receiver_thread_argp->sig_terminate == false )
696  {
697    // Read next PDU from socket or process trailing bytes in remainbuf
698    ret= 0;
699    msgcontentlength= 0;
700    msgcontentlength_known= false;
701    pdu_complete= false;
702    netmsg= 0;
703
704    // there are trailing bytes left from the last receive call
705    if (remainbuf != 0)
706    {
707      netmsg= remainbuf;
708      remainbuf= 0;
709      buffer_bytes_left= netmsg->get_size()-trailingbytes;
710      bytes_received= trailingbytes;
711      trailingbytes= 0;
712      skiprecv= true;
713    }
714    else // no trailing bytes, create a new buffer
715    if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
716    {
717      buffer_bytes_left= netmsg->get_size();
718      bytes_received= 0;
719      skiprecv= false;
720    }
721    else
722    { // buffer allocation failed
723      bytes_received= 0;
724      buffer_bytes_left= 0;
725      recv_error= true;
726    }
727   
728    // loops until PDU is complete
729    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
730    while (!pdu_complete && 
731           !recv_error && 
732           !receiver_thread_argp->sig_terminate)
733    {
734      if (!skiprecv)
735      {
736        // read from TCP socket or return after sleep_time
737        poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
738       
739        if (receiver_thread_argp->sig_terminate)
740        {
741          Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
742          // disallow sending
743          AssocDataUDS* myassoc=const_cast<AssocDataUDS *>(receiver_thread_argp->peer_assoc);
744          if (myassoc->shutdown == false)
745          {
746            myassoc->shutdown= true;
747            if (shutdown(myassoc->socketfd,SHUT_WR))
748            {
749              if ( errno != ENOTCONN )
750                Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
751            }
752          }
753          // try to read do a last read from the TCP socket or return after sleep_time
754          if (poll_status == 0)
755          {
756            poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
757          }
758        }
759
760        if (poll_fd.revents & POLLERR) // Error condition
761        {
762          if (errno == 0 || errno == EINTR)
763          {
764            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "poll(): " << strerror(errno));
765          }
766          else
767          {
768            Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
769            recv_error= true;
770          }
771        }
772       
773        if (poll_fd.revents & POLLHUP) // Hung up
774        {
775          Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
776          recv_error= true;
777        }
778       
779        if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
780        {
781          Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll Invalid request: fd not open");
782          recv_error= true;
783        }
784       
785        // check status (return value) of poll call
786        switch (poll_status)
787        {
788          case -1:
789            if (errno == 0 || errno == EINTR)
790            {
791              Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "Poll status: " << strerror(errno));
792            }
793            else
794            {
795              Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
796              recv_error= true;
797            }
798           
799            continue; // next while iteration
800            break;
801           
802          case 0:
803#ifdef DEBUG_HARD
804            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
805#endif
806            continue; // next while iteration
807            break;
808           
809          default:
810#ifdef DEBUG_HARD
811            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
812#endif
813            break;
814        } // end switch
815
816
817        /// receive data from socket buffer (recv will not block)
818        ret = recv(conn_socket, 
819                   netmsg->get_buffer() + bytes_received, 
820                   buffer_bytes_left, 
821                   0);
822
823        if ( ret < 0 )
824        {
825          if (errno!=EAGAIN && errno!=EWOULDBLOCK)
826          {
827            Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
828            recv_error= true;
829            continue;
830          }
831          else
832          { // errno==EAGAIN || errno==EWOULDBLOCK
833            // just nothing to read from socket, continue w/ next poll
834            continue;
835          }
836        }
837        else
838        {
839          if (ret == 0)
840          {
841            // this means that EOF is reached,
842            // other side has closed connection
843            Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
844            // disallow sending
845            AssocDataUDS* myassoc=const_cast<AssocDataUDS *>(receiver_thread_argp->peer_assoc);
846            if (myassoc->shutdown == false)
847            {
848              myassoc->shutdown= true;
849              if (shutdown(myassoc->socketfd,SHUT_WR))
850              {
851                if ( errno != ENOTCONN )
852                  Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
853              }
854            }
855            // not a real error, but we must quit the receive loop
856            recv_error= true;
857          }
858          else
859          {
860            Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
861            // track number of received bytes
862
863            bytes_received+= ret;
864            buffer_bytes_left-= ret;
865          }
866        }
867      } // end if do not skip recv() statement     
868
869      if (buffer_bytes_left < 0) ///< buffer space exhausted now
870      {
871        recv_error= true;
872        Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
873      }
874
875      if (!msgcontentlength_known) ///< common header not parsed
876      {
877        // enough bytes read to parse common header?
878        if (bytes_received >= common_header_length)
879        {
880          // get message content length in number of bytes
881          if (getmsglength(*netmsg, msgcontentlength)) msgcontentlength_known= true;
882          else
883          {
884              Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
885
886            ostringstream hexdumpstr;
887            netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
888            Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"dumping received bytes:" << hexdumpstr.str());
889
890            // reset all counters
891            msgcontentlength= 0;
892            msgcontentlength_known= false;
893            bytes_received= 0;
894            pdu_complete= false;
895            continue;
896          }
897        }
898      } // endif common header not parsed
899
900      // check whether we have read the whole Protocol PDU
901      //DLog(tpparam.name, "bytes_received - common_header_length: " << bytes_received-common_header_length << "msgcontentlength: " << msgcontentlength << ">=");
902      if (msgcontentlength_known && bytes_received-common_header_length >= msgcontentlength )
903      {
904       
905        pdu_complete= true;
906        if (bytes_received-common_header_length > msgcontentlength)
907        {
908          Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
909          remainbuf= new NetMsg(NetMsg::max_size);
910          trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
911          bytes_received= common_header_length+msgcontentlength;
912          memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
913        }
914      }
915    } // end while (!pdu_complete && !recv_error && !signalled for termination)
916    // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
917
918    // if other side closed the connection, we should still be able to deliver the remaining data
919    if (ret == 0)
920    {
921      recv_error= false;
922    }
923
924    // deliver only complete PDUs to signaling module
925    if (!recv_error && pdu_complete)
926    {
927      // create TPMsg and send it to the signaling thread
928      tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
929
930      Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "receipt of PDU now complete, sending to module " <<  message::get_qaddr_name(tpparam.dest));
931
932      debug_pdu=false;
933
934      if (debug_pdu)
935      {
936        ostringstream hexdump;
937        netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
938        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
939      }
940
941      // send the message if it was successfully created
942      // bool message::send_to(qaddr_t dest, bool exp = false);
943      if (!tpmsg
944          || (!tpmsg->get_peeraddress())
945          || (!tpmsg->send_to(tpparam.dest))) 
946      {
947        Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
948        if (tpmsg) delete tpmsg;
949      } // end if tpmsg not allocated or not addr or not sent
950
951    } // end if !recv_error
952    else
953    { // error during receive or PDU incomplete
954      if (bytes_received>0)
955      {
956        Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ?  "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
957      }
958
959      if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
960      {
961        ostringstream hexdumpstr;
962        netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
963        Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str()); 
964      }
965      // leave the outer loop
966      /**********************/
967      break;
968      /**********************/
969    } // end else
970
971  } // end while (thread not signalled for termination)
972
973  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() 
974      << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
975
976  // shutdown socket
977  if (shutdown(conn_socket, SHUT_RD))
978  {
979    if ( errno != ENOTCONN )
980      Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
981  }
982
983  // close socket
984  close(conn_socket);
985
986  receiver_thread_argp->terminated= true;
987
988  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
989
990#ifdef _DEBUG
991  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
992#endif
993  // notify master thread for invocation of cleanup procedure
994  TPoverUDSMsg* newmsg= new(nothrow)TPoverUDSMsg(receiver_thread_argp->peer_assoc);
995  // send message to main loop thread
996  newmsg->send_to(tpparam.source);
997} 
998
999
1000/** this signals a terminate to a thread and wait for the thread to stop
1001 *  @note it is not safe to access any thread related data after this method returned,
1002 *        because the receiver thread will initiate a cleanup_receiver_thread() method
1003 *        which may erase all relevant thread data.
1004 */
1005void 
1006TPoverUDS::stop_receiver_thread(AssocDataUDS* peer_assoc)
1007{
1008  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1009  // after this procedure peer_assoc may be invalid because it was erased
1010
1011  // start critical section
1012
1013  if (peer_assoc == 0)
1014    return;
1015
1016  pthread_t thread_id=  peer_assoc->thread_ID;
1017 
1018  // try to clean up receiver_thread_arg
1019  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1020  receiver_thread_arg_t* recv_thread_argp= 
1021    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1022  if (recv_thread_argp)
1023  {
1024    if (!recv_thread_argp->terminated)
1025    {
1026      // thread signaled termination, but is not?
1027      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1028
1029      // signal thread for its termination
1030      recv_thread_argp->sig_terminate= true;
1031      // wait for thread to join after termination
1032      pthread_join(thread_id, 0);
1033      // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1034      return;
1035    }
1036  }
1037  else
1038    Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1039
1040}
1041
1042
1043/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1044 *  usually called by the master_thread after the receiver_thread terminated
1045 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1046 *       is still valid
1047 */
1048void 
1049TPoverUDS::cleanup_receiver_thread(AssocDataUDS* peer_assoc)
1050{
1051  // All operations on  recv_thread_argmap and connmap require an already acquired lock
1052  // after this procedure peer_assoc may be invalid because it was erased
1053
1054  // start critical section
1055
1056  if (peer_assoc == 0)
1057    return;
1058
1059  pthread_t thread_id=  peer_assoc->thread_ID;
1060 
1061  // try to clean up receiver_thread_arg
1062  recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1063  receiver_thread_arg_t* recv_thread_argp= 
1064    (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1065  if (recv_thread_argp)
1066  {
1067    if (!recv_thread_argp->terminated)
1068    {
1069      // thread signaled termination, but is not?
1070      Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1071      return;
1072    }
1073    else
1074    { // if thread is already terminated
1075      Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1076
1077      // delete it from receiver map
1078      recv_thread_argmap.erase(recv_thread_arg_iter);
1079
1080      // then delete receiver arg structure
1081      delete recv_thread_argp;
1082    }
1083  }
1084
1085  // delete entry from connection map
1086
1087  // cleanup sender thread
1088  // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1089  terminate_sender_thread(peer_assoc);
1090
1091  // delete the AssocData structure from the connection map
1092  // also frees allocated AssocData structure
1093  connmap.erase(peer_assoc);
1094
1095  // end critical section
1096
1097  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1098}
1099
1100
1101/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1102 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1103 *       is still valid, a lock is also required, because senderthread_queuemap is changed
1104 */
1105void 
1106TPoverUDS::terminate_sender_thread(const AssocDataUDS* assoc)
1107{
1108  if (assoc == 0)
1109  {
1110    Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1111    return;
1112  }
1113
1114  sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1115
1116  if (it != senderthread_queuemap.end())
1117  { // we have a sender thread: send a stop message to it
1118    FastQueue* destqueue= it->second; 
1119    if (destqueue)
1120    {
1121      TPoverUDSMsg* internalmsg= new TPoverUDSMsg(assoc,tpparam.source,TPoverUDSMsg::stop);
1122      if (internalmsg)
1123      {
1124        // send the internal message to the sender thread queue
1125        internalmsg->send(tpparam.source,destqueue);
1126      }
1127    }
1128    else
1129    {
1130      Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1131    }
1132    // erase entry from map
1133    senderthread_queuemap.erase(it);
1134  }
1135}
1136
1137/* terminate all active threads
1138 * note: locking should not be necessary here because this message is called as last method from
1139 * main_loop()
1140 */
1141void 
1142TPoverUDS::terminate_all_threads()
1143{
1144  AssocDataUDS* assoc= 0;
1145  receiver_thread_arg_t* terminate_argp;
1146
1147  for (recv_thread_argmap_t::iterator terminate_iterator=  recv_thread_argmap.begin();
1148       terminate_iterator !=  recv_thread_argmap.end();
1149       terminate_iterator++)
1150  {
1151    if ( (terminate_argp= terminate_iterator->second) != 0)
1152    {
1153      // we need a non const pointer to erase it later on
1154      assoc= const_cast<AssocDataUDS*>(terminate_argp->peer_assoc);
1155      // check whether thread is still alive
1156      if (terminate_argp->terminated == false)
1157      {
1158        terminate_argp->sig_terminate= true;
1159        // then wait for its termination
1160        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1161            "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1162       
1163        pthread_join(terminate_iterator->first, 0);
1164       
1165        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first  << "> is terminated");
1166      }
1167      else
1168        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, 
1169            "Receiver thread <" << terminate_iterator->first << "> already terminated");
1170       
1171      // cleanup all remaining argument structures of terminated threads
1172      delete terminate_argp;
1173
1174      // terminate any related sender thread that is still running
1175      terminate_sender_thread(assoc);
1176     
1177      connmap.erase(assoc);
1178      // delete assoc is not necessary, because connmap.erase() will do the job
1179    }
1180  } // end for
1181}
1182
1183
1184/**
1185 * sender thread starter:
1186 * just a static starter method to allow starting the
1187 * actual sender_thread() method.
1188 *
1189 * @param argp - pointer to the current TPoverUDS object instance and receiver_thread_arg_t struct
1190 */
1191void*
1192TPoverUDS::sender_thread_starter(void *argp)
1193{
1194  sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1195 
1196  //cout << "invoked sender_thread_Starter" << endl;
1197
1198  // invoke sender thread method
1199  if (sargp != 0 && sargp->instance != 0)
1200  {
1201    // call receiver_thread member function on object instance
1202    sargp->instance->sender_thread(sargp->sender_thread_queue);
1203
1204    //cout << "Before deletion of sarg" << endl;
1205
1206    // no longer needed
1207    delete sargp;
1208  }
1209  else
1210  {
1211    Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1212  }
1213  return 0;
1214}
1215
1216
1217
1218
1219/**
1220 * receiver thread starter:
1221 * just a static starter method to allow starting the
1222 * actual receiver_thread() method.
1223 *
1224 * @param argp - pointer to the current TPoverUDS object instance and receiver_thread_arg_t struct
1225 */
1226void*
1227TPoverUDS::receiver_thread_starter(void *argp)
1228{
1229  receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1230  // invoke receiver thread method
1231  if (rargp != 0 && rargp->instance != 0)
1232  {
1233    // call receiver_thread member function on object instance
1234    rargp->instance->receiver_thread(rargp->rtargp);
1235
1236    // no longer needed
1237    delete rargp;
1238  }
1239  else
1240  {
1241    Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1242  }
1243  return 0;
1244}
1245
1246
1247void
1248TPoverUDS::create_new_sender_thread(FastQueue* senderfqueue)
1249{
1250  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1251
1252  pthread_t senderthreadid;
1253  // create new thread; (arg == 0) is handled by thread, too
1254  int pthread_status= pthread_create(&senderthreadid, 
1255                                     NULL, // NULL: default attributes: thread is joinable and has a
1256                                     //       default, non-realtime scheduling policy
1257                                     TPoverUDS::sender_thread_starter,
1258                                     new sender_thread_start_arg_t(this,senderfqueue));
1259  if (pthread_status)
1260  {
1261    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1262   
1263    delete senderfqueue;
1264  }
1265}
1266
1267
1268void
1269TPoverUDS::create_new_receiver_thread(AssocDataUDS* peer_assoc)
1270{
1271  receiver_thread_arg_t* argp= 
1272    new(nothrow) receiver_thread_arg(peer_assoc);
1273 
1274  Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1275
1276  // create new thread; (arg == 0) is handled by thread, too
1277  int pthread_status= pthread_create(&peer_assoc->thread_ID, 
1278                                     NULL, // NULL: default attributes: thread is joinable and has a
1279                                     //       default, non-realtime scheduling policy
1280                                     receiver_thread_starter,
1281                                     new(nothrow) receiver_thread_start_arg_t(this,argp));
1282  if (pthread_status)
1283  {
1284    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " <<  strerror(pthread_status));
1285   
1286    delete argp;
1287  }
1288  else
1289  {
1290    lock(); // install_cleanup_thread_lock(TPoverUDS, this);
1291
1292    // remember pointer to thread arg structure
1293    // thread arg structure should be destroyed after thread termination only
1294    pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1295      recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1296    if (tmpinsiterator.second == false)
1297    {
1298      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1299    }
1300    unlock(); // uninstall_cleanup(1);
1301  } 
1302}
1303
1304
1305/**
1306 * master listener thread starter:
1307 * just a static starter method to allow starting the
1308 * actual master_listener_thread() method.
1309 *
1310 * @param argp - pointer to the current TPoverUDS object instance
1311 */
1312void*
1313TPoverUDS::master_listener_thread_starter(void *argp)
1314{
1315  // invoke listener thread method
1316  if (argp != 0)
1317  {
1318    (static_cast<TPoverUDS*>(argp))->master_listener_thread();
1319  }
1320  return 0;
1321}
1322
1323
1324
1325/**
1326 * master listener thread: waits for incoming connections at the well-known tcp port
1327 * when a connection request is received this thread spawns a receiver_thread for
1328 * receiving packets from the peer at the new socket.
1329 */
1330void
1331TPoverUDS::master_listener_thread()
1332{
1333    //remove any existing socket files
1334    unlink(tpparam.udssocket.c_str());
1335
1336
1337  // create a new address-structure for the listening masterthread
1338    struct sockaddr_un  own_address;
1339    own_address.sun_family = AF_UNIX;
1340    strcpy(own_address.sun_path, tpparam.udssocket.c_str());
1341    unlink(tpparam.udssocket.c_str());
1342    uint32 len = strlen(tpparam.udssocket.c_str()) + sizeof(own_address.sun_family);
1343   
1344   
1345  // create a listening socket
1346  int master_listener_socket= socket(AF_UNIX, SOCK_STREAM, 0);
1347  if (master_listener_socket == -1)
1348  {
1349    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1350    return;
1351  }
1352 
1353
1354  // Reuse ports, even if they are busy
1355  int socketreuseflag= 1;
1356  int status= setsockopt(master_listener_socket,
1357                           SOL_SOCKET,
1358                           SO_REUSEADDR,
1359                           (const char *) &socketreuseflag,
1360                           sizeof(socketreuseflag));
1361  if (status)
1362  {
1363       Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1364  }
1365 
1366 
1367  // bind the newly created socket to a specific address
1368  int bind_status = bind(master_listener_socket,
1369                         reinterpret_cast<struct sockaddr *>(&own_address),
1370                         len);
1371  if (bind_status)
1372    { 
1373      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " << tpparam.udssocket);
1374      return;
1375    }
1376
1377    // listen at the socket,
1378    // queuesize for pending connections= max_listen_queue_size
1379    int listen_status = listen(master_listener_socket, max_listen_queue_size);
1380    if (listen_status)
1381    {
1382      Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1383          << " failed, error: " << strerror(errno));
1384      return;
1385    }
1386    else
1387    {
1388      Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at " << tpparam.udssocket << color[off]);
1389    }
1390
1391    // activate O_NON_BLOCK for accept (accept does not block)
1392    fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1393
1394    // create a pollfd struct for use in the mainloop
1395    struct pollfd poll_fd;
1396    poll_fd.fd = master_listener_socket;
1397    poll_fd.events = POLLIN | POLLPRI; 
1398    poll_fd.revents = 0;
1399    /*
1400      #define POLLIN    0x001   // There is data to read.
1401      #define POLLPRI   0x002   // There is urgent data to read. 
1402      #define POLLOUT   0x004   // Writing now will not block. 
1403    */
1404   
1405    bool terminate = false;
1406    // check for thread terminate condition using get_state()
1407    state_t currstate= get_state();
1408    int poll_status= 0;
1409    const unsigned int number_poll_sockets= 1; 
1410    struct sockaddr_un peer_address;
1411    socklen_t peer_address_len;
1412    int conn_socket;
1413
1414    // check whether this thread is signaled for termination
1415    while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1416    {
1417      // wait on number_poll_sockets (main drm socket)
1418      // for the events specified above for sleep_time (in ms)
1419      poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1420      if (poll_fd.revents & POLLERR) // Error condition
1421      {
1422        if (errno != EINTR) 
1423        {
1424          Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1425              "Poll caused error " << strerror(errno) << " - indicated by revents");
1426        }
1427        else
1428        {
1429          Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1430        }
1431
1432      }
1433      if (poll_fd.revents & POLLHUP) // Hung up
1434      {
1435        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1436        return;
1437      }
1438      if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1439      {
1440        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1441        return;
1442      }
1443     
1444      switch (poll_status)
1445      {
1446        case -1:
1447          if (errno != EINTR)
1448          {
1449            Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1450          }
1451          else
1452          {
1453            Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1454          }
1455           
1456          break;
1457
1458        case 0:
1459#ifdef DEBUG_HARD
1460          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, 
1461              "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1462#endif
1463          currstate= get_state();
1464          continue;
1465          break;
1466
1467        default:
1468#ifdef DEBUG_HARD
1469          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1470#endif
1471          break;
1472      } // end switch
1473
1474      // after a successful accept call,
1475      // accept stores the address information of the connecting party
1476      // in peer_address and the size of its address in addrlen
1477      peer_address_len= sizeof(peer_address);
1478      conn_socket = accept (master_listener_socket,
1479                            reinterpret_cast<struct sockaddr *>(&peer_address),
1480                            &peer_address_len);
1481      if (conn_socket == -1)
1482      {
1483        if (errno != EWOULDBLOCK && errno != EAGAIN)
1484        {
1485          Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1486              << " failed, error: " << strerror(errno));
1487          return;
1488        }
1489      }
1490      else
1491      {
1492        // create a new assocdata-object for the new thread
1493        AssocDataUDS* peer_assoc = NULL;
1494        udsaddress addr(conn_socket);
1495
1496        Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr);
1497
1498        //struct sockaddr_un own_address;
1499        //socklen_t own_address_len= sizeof(own_address);
1500        //getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1501
1502        // AssocData will copy addr content into its own structure
1503        // allocated peer_assoc will be stored in connmap
1504        peer_assoc = new(nothrow) AssocDataUDS(conn_socket, addr, udsaddress(conn_socket));
1505
1506        bool insert_success= false;
1507        if (peer_assoc)
1508        {
1509          // start critical section
1510          lock(); // install_cleanup_thread_lock(TPoverUDS, this);
1511          insert_success= connmap.insert(peer_assoc);
1512          // end critical section
1513          unlock(); // uninstall_cleanup(1);
1514        }
1515       
1516       
1517        if (insert_success == false) // not inserted into connmap
1518        {
1519          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1520              << ", " << addr << " into connection map, aborting connection...");
1521
1522          // abort connection, delete its AssocData
1523          close (conn_socket);
1524          if (peer_assoc) 
1525          { 
1526            delete peer_assoc;
1527            peer_assoc= 0;
1528          }
1529          return;
1530               
1531        } //end __else(connmap.insert());__
1532       
1533        // create a new thread for each new connection
1534        create_new_receiver_thread(peer_assoc);
1535      } // end __else (connsocket)__
1536     
1537      // get new thread state
1538      currstate= get_state();
1539
1540    } // end while(!terminate)
1541    return;
1542} // end listen_for_connections()   
1543
1544
1545TPoverUDS::~TPoverUDS()
1546{
1547  init= false;
1548
1549  Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,  "Destructor called");
1550
1551  QueueManager::instance()->unregister_queue(tpparam.source);
1552}
1553
1554/** TPoverUDS Thread main loop.
1555 * This loop checks for internal messages of either
1556 * a send() call to start a new receiver thread, or,
1557 * of a receiver_thread() that signals its own termination
1558 * for proper cleanup of control structures.
1559 * It also handles the following internal TPoverUDSMsg types:
1560 * - TPoverUDSMsg::stop - a particular receiver thread is terminated
1561 * - TPoverUDSMsg::start - a particular receiver thread is started
1562 * @param nr number of current thread instance
1563 */
1564void 
1565TPoverUDS::main_loop(uint32 nr)
1566{
1567  // get internal queue for messages from receiver_thread
1568  FastQueue* fq = get_fqueue();
1569  if (!fq) 
1570  {
1571    Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1572    return;
1573  } // end if not fq
1574  // register queue for receiving internal messages from other modules
1575  QueueManager::instance()->register_queue(fq,tpparam.source);
1576
1577  // start master listener thread
1578  pthread_t master_listener_thread_ID;
1579  int pthread_status= pthread_create(&master_listener_thread_ID, 
1580                                     NULL, // NULL: default attributes: thread is joinable and has a
1581                                     //       default, non-realtime scheduling policy
1582                                     master_listener_thread_starter,
1583                                     this);
1584  if (pthread_status)
1585  {
1586    Log(ERROR_LOG,LOG_CRIT, tpparam.name, 
1587        "New master listener thread could not be created: " <<  strerror(pthread_status));
1588  }
1589  else
1590    Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1591
1592
1593  // define max latency for thread reaction on termination/stop signal
1594  timespec wait_interval= { 0, 250000000L }; // 250ms
1595  message* internal_thread_msg = NULL;
1596  state_t currstate= get_state();
1597
1598  // check whether this thread is signaled for termination
1599  while( currstate!=STATE_ABORT && currstate!=STATE_STOP ) 
1600  {
1601    // poll internal message queue (blocking)
1602    if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1603    {
1604      TPoverUDSMsg* internalmsg= dynamic_cast<TPoverUDSMsg*>(internal_thread_msg);
1605      if (internalmsg)
1606      {
1607        if (internalmsg->get_msgtype() == TPoverUDSMsg::stop)
1608        {
1609          // a receiver thread terminated and signaled for cleanup by master thread
1610          AssocDataUDS* assocd= const_cast<AssocDataUDS*>(internalmsg->get_peer_assoc());
1611          Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1612          lock();
1613          cleanup_receiver_thread( assocd );
1614          unlock();
1615        }
1616        else
1617        if (internalmsg->get_msgtype() == TPoverUDSMsg::start)
1618        {
1619          // start a new receiver thread
1620          create_new_receiver_thread( const_cast<AssocDataUDS*>(internalmsg->get_peer_assoc()) );
1621        }
1622        else
1623          Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1624         
1625        delete internalmsg;
1626      }
1627      else
1628      {
1629        Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1630            << internal_thread_msg->get_source());
1631      }
1632    } // endif
1633
1634    // get thread state
1635    currstate= get_state();
1636  } // end while
1637
1638  if (currstate==STATE_STOP)
1639  {
1640    // start abort actions
1641    Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1642  } // end if stopped
1643
1644  // do not accept any more messages
1645  fq->shutdown();
1646  // terminate all receiver and sender threads that are still active
1647  terminate_all_threads();
1648}
1649
1650} // end namespace protlib
1651///@}
Note: See TracBrowser for help on using the repository browser.