source: source/ariba/communication/modules/transport/protlib/tp_over_uds.cpp@ 4608

Last change on this file since 4608 was 4608, checked in by Christoph Mayer, 15 years ago

unspec identifier handling

File size: 50.7 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_uds.cpp
3/// transport module for unix domain socket communication
4/// ----------------------------------------------------------
5/// $Id: tp_over_uds.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_uds.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42#include <sys/un.h>
43}
44
45#include <iostream>
46#include <errno.h>
47#include <string>
48#include <sstream>
49
50#include "tp_over_uds.h"
51#include "threadsafe_db.h"
52#include "cleanuphandler.h"
53#include "setuid.h"
54#include "queuemanager.h"
55#include "logfile.h"
56
57#include <set>
58
59#define UDS_SUCCESS 0
60#define UDS_SEND_FAILURE 1
61
62const unsigned int max_listen_queue_size= 10;
63
64namespace protlib {
65
66using namespace log;
67
68/** @defgroup tpuds TP over UDS
69 * @ingroup network
70 * @{
71 */
72
73
74/******* class TPoverUDS *******/
75
76
77/** get_connection_to() checks for already existing connections.
78 * If a connection exists, it returns "AssocData"
79 * and saves it in "connmap" for further use
80 * If no connection exists, a new connection to "addr"
81 * is created.
82 */
83AssocDataUDS*
84TPoverUDS::get_connection_to(udsaddress& addr)
85{
86 // get timeout
87 struct timespec ts;
88 get_time_of_day(ts);
89 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
90 if (ts.tv_nsec>=1000000000L)
91 {
92 ts.tv_sec += ts.tv_nsec / 1000000000L;
93 ts.tv_nsec= ts.tv_nsec % 1000000000L;
94 }
95
96 // create a new AssocData pointer, initialize it to NULL
97 AssocDataUDS* assoc= NULL;
98 int new_socket;
99 // loop until timeout is exceeded: TODO: check applicability of loop
100 do
101 {
102 // check for existing connections to addr
103 // start critical section
104 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
105 assoc= connmap.lookup(addr);
106 // end critical section
107 unlock(); // uninstall_cleanup(1);
108 if (assoc)
109 {
110 // If not shut down then use it, else abort, wait and check again.
111 if (!assoc->shutdown)
112 {
113 return assoc;
114 }
115 else
116 {
117 // TODO: connection is already in shutdown mode. What now?
118 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"socket exists, but is already in mode shutdown");
119
120 return 0;
121 }
122 } //end __if (assoc)__
123 else
124 {
125 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
126 << addr << " found, creating a new one.");
127 }
128
129 // no connection found, create a new one
130 new_socket = socket(AF_UNIX, SOCK_STREAM, 0);
131 if (new_socket == -1)
132 {
133 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"Couldn't create a new socket: " << strerror(errno));
134
135 return 0;
136 }
137
138 // Reuse ports, even if they are busy
139 int socketreuseflag= 1;
140 int status= setsockopt(new_socket,
141 SOL_SOCKET,
142 SO_REUSEADDR,
143 (const char *) &socketreuseflag,
144 sizeof(socketreuseflag));
145 if (status)
146 {
147 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
148 }
149
150 struct sockaddr_un dest_address;
151 dest_address.sun_family= AF_UNIX;
152 strcpy(dest_address.sun_path, addr.get_udssocket().c_str());
153
154 // connect the socket to the destination address
155 int connect_status = connect(new_socket,
156 reinterpret_cast<const struct sockaddr*>(&dest_address),
157 sizeof(dest_address));
158
159 // connects to the listening_port of the peer's masterthread
160 if (connect_status != 0)
161 {
162 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"Connect to " << addr.get_udssocket() << "failed: [" << color[red] << strerror(errno) << color[off] << "]");
163
164 throw TPErrorConnectSetupFail();
165 }
166
167
168 //struct sockaddr_un own_address;
169 //socklen_t own_address_len= sizeof(own_address);
170 //getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
171
172 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr << " from " << udsaddress(new_socket));
173
174 // create new AssocData record (will copy addr)
175 assoc = new(nothrow) AssocDataUDS(new_socket, addr, udsaddress(new_socket));
176
177 // if assoc could be successfully created, insert it into ConnectionMap
178 if (assoc)
179 {
180 bool insert_success= false;
181 // start critical section
182 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
183 // insert AssocData into connection map
184 insert_success= connmap.insert(assoc);
185 // end critical section
186 unlock(); // uninstall_cleanup(1);
187
188 if (insert_success == true)
189 {
190#ifdef _DEBUG
191 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr << " via socket " << new_socket);
192
193
194#endif
195
196 // notify master thread to start a receiver thread (i.e. send selfmessage)
197 TPoverUDSMsg* newmsg= new(nothrow)TPoverUDSMsg(assoc, tpparam.source, TPoverUDSMsg::start);
198 if (newmsg)
199 {
200 newmsg->send_to(tpparam.source);
201 return assoc;
202 }
203 else
204 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"get_connection_to: could not get memory for internal msg");
205 }
206 else
207 {
208 // delete data and abort
209 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr << " into connection map, aborting connection");
210
211 // abort connection, delete its AssocData
212 close (new_socket);
213 if (assoc)
214 {
215 delete assoc;
216 assoc= 0;
217 }
218 return assoc;
219 } // end else connmap.insert
220
221 } // end "if (assoc)"
222 }
223 while (wait_cond(ts)!=ETIMEDOUT);
224
225 return assoc;
226} //end get_connection_to
227
228
229/** terminates a signaling association/connection
230 * - if no connection exists, generate a warning
231 * - otherwise: generate internal msg to related receiver thread
232 */
233void
234TPoverUDS::terminate(const address& in_addr)
235{
236
237 udsaddress* addr = NULL;
238 addr = dynamic_cast<udsaddress*>(in_addr.copy());
239
240#ifndef _NO_LOGGING
241 const char *const thisproc="terminate() - ";
242#endif
243
244 // Create a new AssocData-pointer
245 AssocDataUDS* assoc = NULL;
246
247 // check for existing connections to addr
248
249 // start critical section:
250 // please note if receiver_thread terminated already, the assoc data is not
251 // available anymore therefore we need a lock around cleanup_receiver_thread()
252
253 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
254 assoc= connmap.lookup(*addr);
255 if (assoc)
256 {
257 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
258 // If not shut down then use it, else abort, wait and check again.
259 if (!assoc->shutdown)
260 {
261 if (assoc->socketfd)
262 {
263 // disallow sending
264 if (shutdown(assoc->socketfd,SHUT_WR))
265 {
266 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
267 }
268 else
269 {
270 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
271 }
272 }
273 assoc->shutdown= true;
274 }
275 else
276 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
277
278 }
279 else
280 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,thisproc<<"could not find a connection for peer " << addr);
281
282 stop_receiver_thread(assoc);
283
284 // end critical section
285 unlock(); // uninstall_cleanup(1);
286
287 if (addr) delete addr;
288
289}
290
291
292/** generates and internal TPoverUDS message to send a NetMsg to the network
293 *
294 * - it is necessary to let a thread do this, because the caller
295 * may get blocked if the connect() or send() call hangs for a while
296 * - the sending thread will call TPoverUDS::udssend()
297 * - if no connection exists, creates a new one
298 *
299 * @note the netmsg is deleted by the send() method when it is not used anymore
300 */
301void
302TPoverUDS::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
303{
304 if (netmsg == NULL) {
305 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
306 return;
307 }
308
309 udsaddress* addr = NULL;
310 addr = dynamic_cast<udsaddress*>(in_addr.copy());
311
312 if (!addr)
313 {
314 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (udsaddress), has type: " << (int) in_addr.get_type());
315 return;
316 }
317
318
319 // lock due to sendermap access
320 lock();
321
322
323 // check for existing sender thread
324 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
325
326 FastQueue* destqueue= 0;
327
328 if (it == senderthread_queuemap.end())
329 { // no sender thread so far
330
331 if (!use_existing_connection)
332 { // it is allowed to create a new connection for this thread
333
334 // create a new queue for sender thread
335 FastQueue* sender_thread_queue= new FastQueue;
336 create_new_sender_thread(sender_thread_queue);
337 // remember queue for later use
338
339 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
340 senderthread_queuemap.insert( pair<udsaddress,FastQueue*> (*addr,sender_thread_queue) );
341
342 destqueue= sender_thread_queue;
343 }
344 }
345 else
346 { // we have a sender thread, use stored queue from map
347 destqueue= it->second;
348 }
349
350 unlock();
351
352 // send a send_data message to it (if we have a destination queue)
353 if (destqueue)
354 {
355 // both parameters will be freed after message was sent!
356
357 //DLog(tpparam.name, "Sending self-message for socket " << addr->get_udssocket().c_str());
358
359 TPoverUDSMsg* internalmsg= new TPoverUDSMsg(netmsg,addr->copy());
360 if (internalmsg)
361 {
362 //DLog(tpparam.name, "Address in internal message: " << internalmsg->get_appladdr()->get_udssocket());
363 // send the internal message to the sender thread queue
364 internalmsg->send(tpparam.source,destqueue);
365 }
366 }
367 else
368 {
369 if (!use_existing_connection)
370 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
371 else
372 DLog(tpparam.name,"no active sender thread found " << *addr << " - but policy forbids to set up a new connection, will drop data");
373
374 // cannot send data, so we must drop it
375 delete netmsg;
376 }
377
378 if (addr) delete addr;
379}
380
381/** sends a NetMsg to the network.
382 *
383 * @param netmsg - message to send
384 * @param addr - transport endpoint address
385 *
386 * @note if no connection exists, creates a new one
387 * @note both parameters are deleted after the message was sent
388 */
389void
390TPoverUDS::udssend(NetMsg* netmsg, udsaddress* addr)
391{
392#ifndef _NO_LOGGING
393 const char *const thisproc="sender - ";
394#endif
395
396 // set result initially to success, set it to failure
397 // in places where these failures occur
398 int result = UDS_SUCCESS;
399 int ret= 0;
400
401 // Create a new AssocData-pointer
402 AssocDataUDS* assoc = NULL;
403
404 // tp.cpp checks for initialisation of tp and correctness of
405 // msgsize, protocol and ip,
406 // throws an error if something is not right
407 if (addr) {
408 if (!(addr->get_udssocket().size() || addr->get_socknum())) {
409 ERRCLog(tpparam.name, "No UNIX Domain Socket Path / Socket Number in address, will not process it: " << addr->get_udssocket());
410 return;
411 }
412 } else {
413 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "address pointer is NULL");
414 result= UDS_SEND_FAILURE;
415 throw TPErrorInternal();
416 }
417
418
419
420 // check for existing connections,
421 // if a connection exists, return its AssocData
422 // and save it in assoc for further use
423 // if no connection exists, create a new one (in get_connection_to()).
424 // Connection is inserted into connection map that must be done
425 // with exclusive access
426 assoc= get_connection_to(*addr);
427
428 if (assoc==NULL || assoc->socketfd<=0)
429 {
430 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "no valid assoc/socket data");
431
432 delete netmsg;
433 delete addr;
434 return;
435 }
436 if (assoc->shutdown)
437 {
438 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
439 delete netmsg;
440 delete addr;
441
442 throw TPErrorSendFailed();
443 }
444
445 uint32 msgsize= netmsg->get_size();
446#ifdef DEBUG_HARD
447 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
448#endif
449
450 // send all the data contained in netmsg to the socket
451 // which belongs to the address "addr"
452 for (uint32 bytes_sent= 0;
453 bytes_sent < msgsize;
454 bytes_sent+= ret)
455 {
456
457#ifdef _DEBUG_HARD
458 for (uint32 i=0;i<msgsize;i++)
459 {
460 cout << "send_buf: " << i << " : ";
461 if ( isalnum(*(netmsg->get_buffer()+i)) )
462 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
463 else
464 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
465 cout << endl;
466 }
467
468 cout << endl;
469 cout << "bytes_sent: " << bytes_sent << endl;
470 cout << "Message size: " << msgsize << endl;
471 cout << "Send-Socket: " << assoc->socketfd << endl;
472 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
473 cout << "vor send " << endl;
474#endif
475
476 // socket send
477 ret= ::send(assoc->socketfd,
478 netmsg->get_buffer() + bytes_sent,
479 msgsize - bytes_sent,
480 MSG_NOSIGNAL);
481
482 //send_buf + bytes_sent
483
484 if (debug_pdu)
485 {
486 ostringstream hexdump;
487 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
488 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
489 }
490
491 if (ret < 0)
492 {
493 result= UDS_SEND_FAILURE;
494 break;
495 } // end if (ret < 0)
496 } // end for
497
498
499 // *** note: netmsg is deleted here ***
500 delete netmsg;
501
502 // Throwing an exception within a critical section does not
503 // unlock the mutex.
504
505 if (result != UDS_SUCCESS)
506 {
507 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "Unix Domain Socket error, returns " << ret << ", error : " << strerror(errno));
508 delete addr;
509
510 throw TPErrorSendFailed();
511
512 // There is no special errorcode for sending failed
513 // in tp.h, there are only these:
514 /*
515 ERROR_BAD_ADDRESS,
516 ERROR_BAD_NETMSG,
517 ERROR_NOT_INIT,
518 ERROR_UNREACHABLE,
519 ERROR_INTERNAL,
520 ERROR_PAYLOAD
521 */
522 }
523 else
524 Log(EVENT_LOG,LOG_NORMAL,tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
525
526 if (!assoc) {
527 // no connection
528
529 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "cannot get connection to " << *addr);
530
531 delete addr;
532
533 throw TPErrorUnreachable(); // should be no assoc found
534 } // end "if (!assoc)"
535
536 // *** delete address ***
537 delete addr;
538}
539
540/* this thread waits for an internal message that either:
541 * - requests transmission of a packet
542 * - requests to stop this thread
543 * @param argp points to the thread queue for internal messages
544 */
545void
546TPoverUDS::sender_thread(void *argp)
547{
548#ifndef _NO_LOGGING
549 const char *const methodname="senderthread - ";
550#endif
551
552 message* internal_thread_msg = NULL;
553
554 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
555
556 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
557 if (!fq)
558 {
559 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
560 return;
561 }
562
563 bool terminate= false;
564 TPoverUDSMsg* internalmsg= 0;
565 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
566 {
567 internalmsg= dynamic_cast<TPoverUDSMsg*>(internal_thread_msg);
568
569 if (internalmsg == 0)
570 {
571 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "received not an TPoverUDSMsg but a" << internal_thread_msg->get_type_name());
572 }
573 else
574 if (internalmsg->get_msgtype() == TPoverUDSMsg::send_data)
575 {
576
577 //DLog(tpparam.name, "Got a send request for socket " << *(internalmsg->get_appladdr()));
578
579 // create a connection if none exists and send the netmsg
580 if (internalmsg->get_netmsg() && internalmsg->get_udsaddr())
581 {
582 udssend(internalmsg->get_netmsg(),internalmsg->get_udsaddr());
583 }
584 else
585 {
586 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "problem with passed arguments references, they point to 0");
587 }
588 }
589 else
590 if (internalmsg->get_msgtype() == TPoverUDSMsg::stop)
591 {
592 terminate= true;
593 }
594 } // end while
595
596 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
597}
598
599
600/** receiver thread listens at a TCP socket for incoming PDUs
601 * and passes complete PDUs to the coordinator. Incomplete
602 * PDUs due to aborted connections or buffer overflows are discarded.
603 * @param argp - assoc data and flags sig_terminate and terminated
604 *
605 * @note this is a static member function, so you cannot use class variables
606 */
607void
608TPoverUDS::receiver_thread(void *argp)
609{
610#ifndef _NO_LOGGING
611 const char *const methodname="receiver - ";
612#endif
613 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
614 const udsaddress* peer_addr = NULL;
615 const udsaddress* own_addr = NULL;
616 uint32 bytes_received = 0;
617 TPMsg* tpmsg= NULL;
618
619 // argument parsing - start
620 if (receiver_thread_argp == 0)
621 {
622 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
623
624 return;
625 }
626 else
627 {
628 // change status to running, i.e., not terminated
629 receiver_thread_argp->terminated= false;
630
631#ifdef _DEBUG
632 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
633#endif
634 }
635
636 int conn_socket= 0;
637 if (receiver_thread_argp->peer_assoc)
638 {
639 // get socket descriptor from arg
640 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
641 // get pointer to peer address of socket (source/sender address of peer) from arg
642 peer_addr= &receiver_thread_argp->peer_assoc->peer;
643 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
644 }
645 else
646 {
647 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer assoc available - pointer is NULL");
648
649 return;
650 }
651
652 if (peer_addr == 0)
653 {
654 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
655
656 return;
657 }
658 // argument parsing - end
659#ifdef _DEBUG
660 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
661 "Preparing to wait for data at socket "
662 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
663#endif
664
665 int ret= 0;
666 uint32 msgcontentlength= 0;
667 bool msgcontentlength_known= false;
668 bool pdu_complete= false; // when to terminate inner loop
669
670 /* maybe use this to create a new pdu,
671 /// constructor
672 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
673 */
674
675 // activate O_NON_BLOCK for recv on socket conn_socket
676 fcntl(conn_socket,F_SETFL, O_NONBLOCK);
677
678 // set options for polling
679 const unsigned int number_poll_sockets= 1;
680 struct pollfd poll_fd;
681 // have to set structure before poll call
682 poll_fd.fd = conn_socket;
683 poll_fd.events = POLLIN | POLLPRI;
684
685 int poll_status;
686 bool recv_error= false;
687
688 NetMsg* netmsg= 0;
689 NetMsg* remainbuf= 0;
690 size_t buffer_bytes_left= 0;
691 size_t trailingbytes= 0;
692 bool skiprecv= false;
693 // loop until we receive a terminate signal (read-only var for this thread)
694 // or get an error from socket read
695 while( receiver_thread_argp->sig_terminate == false )
696 {
697 // Read next PDU from socket or process trailing bytes in remainbuf
698 ret= 0;
699 msgcontentlength= 0;
700 msgcontentlength_known= false;
701 pdu_complete= false;
702 netmsg= 0;
703
704 // there are trailing bytes left from the last receive call
705 if (remainbuf != 0)
706 {
707 netmsg= remainbuf;
708 remainbuf= 0;
709 buffer_bytes_left= netmsg->get_size()-trailingbytes;
710 bytes_received= trailingbytes;
711 trailingbytes= 0;
712 skiprecv= true;
713 }
714 else // no trailing bytes, create a new buffer
715 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
716 {
717 buffer_bytes_left= netmsg->get_size();
718 bytes_received= 0;
719 skiprecv= false;
720 }
721 else
722 { // buffer allocation failed
723 bytes_received= 0;
724 buffer_bytes_left= 0;
725 recv_error= true;
726 }
727
728 // loops until PDU is complete
729 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
730 while (!pdu_complete &&
731 !recv_error &&
732 !receiver_thread_argp->sig_terminate)
733 {
734 if (!skiprecv)
735 {
736 // read from TCP socket or return after sleep_time
737 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
738
739 if (receiver_thread_argp->sig_terminate)
740 {
741 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
742 // disallow sending
743 AssocDataUDS* myassoc=const_cast<AssocDataUDS *>(receiver_thread_argp->peer_assoc);
744 if (myassoc->shutdown == false)
745 {
746 myassoc->shutdown= true;
747 if (shutdown(myassoc->socketfd,SHUT_WR))
748 {
749 if ( errno != ENOTCONN )
750 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
751 }
752 }
753 // try to read do a last read from the TCP socket or return after sleep_time
754 if (poll_status == 0)
755 {
756 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
757 }
758 }
759
760 if (poll_fd.revents & POLLERR) // Error condition
761 {
762 if (errno == 0 || errno == EINTR)
763 {
764 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "poll(): " << strerror(errno));
765 }
766 else
767 {
768 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
769 recv_error= true;
770 }
771 }
772
773 if (poll_fd.revents & POLLHUP) // Hung up
774 {
775 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
776 recv_error= true;
777 }
778
779 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
780 {
781 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll Invalid request: fd not open");
782 recv_error= true;
783 }
784
785 // check status (return value) of poll call
786 switch (poll_status)
787 {
788 case -1:
789 if (errno == 0 || errno == EINTR)
790 {
791 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "Poll status: " << strerror(errno));
792 }
793 else
794 {
795 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
796 recv_error= true;
797 }
798
799 continue; // next while iteration
800 break;
801
802 case 0:
803#ifdef DEBUG_HARD
804 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
805#endif
806 continue; // next while iteration
807 break;
808
809 default:
810#ifdef DEBUG_HARD
811 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
812#endif
813 break;
814 } // end switch
815
816
817 /// receive data from socket buffer (recv will not block)
818 ret = recv(conn_socket,
819 netmsg->get_buffer() + bytes_received,
820 buffer_bytes_left,
821 0);
822
823 if ( ret < 0 )
824 {
825 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
826 {
827 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
828 recv_error= true;
829 continue;
830 }
831 else
832 { // errno==EAGAIN || errno==EWOULDBLOCK
833 // just nothing to read from socket, continue w/ next poll
834 continue;
835 }
836 }
837 else
838 {
839 if (ret == 0)
840 {
841 // this means that EOF is reached,
842 // other side has closed connection
843 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
844 // disallow sending
845 AssocDataUDS* myassoc=const_cast<AssocDataUDS *>(receiver_thread_argp->peer_assoc);
846 if (myassoc->shutdown == false)
847 {
848 myassoc->shutdown= true;
849 if (shutdown(myassoc->socketfd,SHUT_WR))
850 {
851 if ( errno != ENOTCONN )
852 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
853 }
854 }
855 // not a real error, but we must quit the receive loop
856 recv_error= true;
857 }
858 else
859 {
860 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
861 // track number of received bytes
862
863 bytes_received+= ret;
864 buffer_bytes_left-= ret;
865 }
866 }
867 } // end if do not skip recv() statement
868
869 if (buffer_bytes_left < 0) ///< buffer space exhausted now
870 {
871 recv_error= true;
872 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
873 }
874
875 if (!msgcontentlength_known) ///< common header not parsed
876 {
877 // enough bytes read to parse common header?
878 if (bytes_received >= common_header_length)
879 {
880 // get message content length in number of bytes
881 if (getmsglength(*netmsg, msgcontentlength)) msgcontentlength_known= true;
882 else
883 {
884 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
885
886 ostringstream hexdumpstr;
887 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
888 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"dumping received bytes:" << hexdumpstr.str());
889
890 // reset all counters
891 msgcontentlength= 0;
892 msgcontentlength_known= false;
893 bytes_received= 0;
894 pdu_complete= false;
895 continue;
896 }
897 }
898 } // endif common header not parsed
899
900 // check whether we have read the whole Protocol PDU
901 //DLog(tpparam.name, "bytes_received - common_header_length: " << bytes_received-common_header_length << "msgcontentlength: " << msgcontentlength << ">=");
902 if (msgcontentlength_known && bytes_received-common_header_length >= msgcontentlength )
903 {
904
905 pdu_complete= true;
906 if (bytes_received-common_header_length > msgcontentlength)
907 {
908 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
909 remainbuf= new NetMsg(NetMsg::max_size);
910 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
911 bytes_received= common_header_length+msgcontentlength;
912 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
913 }
914 }
915 } // end while (!pdu_complete && !recv_error && !signalled for termination)
916 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
917
918 // if other side closed the connection, we should still be able to deliver the remaining data
919 if (ret == 0)
920 {
921 recv_error= false;
922 }
923
924 // deliver only complete PDUs to signaling module
925 if (!recv_error && pdu_complete)
926 {
927 // create TPMsg and send it to the signaling thread
928 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
929
930 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "receipt of PDU now complete, sending to module " << message::get_qaddr_name(tpparam.dest));
931
932 debug_pdu=false;
933
934 if (debug_pdu)
935 {
936 ostringstream hexdump;
937 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
938 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
939 }
940
941 // send the message if it was successfully created
942 // bool message::send_to(qaddr_t dest, bool exp = false);
943 if (!tpmsg
944 || (!tpmsg->get_peeraddress())
945 || (!tpmsg->send_to(tpparam.dest)))
946 {
947 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
948 if (tpmsg) delete tpmsg;
949 } // end if tpmsg not allocated or not addr or not sent
950
951 } // end if !recv_error
952 else
953 { // error during receive or PDU incomplete
954 if (bytes_received>0)
955 {
956 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
957 }
958
959 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
960 {
961 ostringstream hexdumpstr;
962 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
963 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
964 }
965 // leave the outer loop
966 /**********************/
967 break;
968 /**********************/
969 } // end else
970
971 } // end while (thread not signalled for termination)
972
973 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
974 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
975
976 // shutdown socket
977 if (shutdown(conn_socket, SHUT_RD))
978 {
979 if ( errno != ENOTCONN )
980 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
981 }
982
983 // close socket
984 close(conn_socket);
985
986 receiver_thread_argp->terminated= true;
987
988 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
989
990#ifdef _DEBUG
991 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
992#endif
993 // notify master thread for invocation of cleanup procedure
994 TPoverUDSMsg* newmsg= new(nothrow)TPoverUDSMsg(receiver_thread_argp->peer_assoc);
995 // send message to main loop thread
996 newmsg->send_to(tpparam.source);
997}
998
999
1000/** this signals a terminate to a thread and wait for the thread to stop
1001 * @note it is not safe to access any thread related data after this method returned,
1002 * because the receiver thread will initiate a cleanup_receiver_thread() method
1003 * which may erase all relevant thread data.
1004 */
1005void
1006TPoverUDS::stop_receiver_thread(AssocDataUDS* peer_assoc)
1007{
1008 // All operations on recv_thread_argmap and connmap require an already acquired lock
1009 // after this procedure peer_assoc may be invalid because it was erased
1010
1011 // start critical section
1012
1013 if (peer_assoc == 0)
1014 return;
1015
1016 pthread_t thread_id= peer_assoc->thread_ID;
1017
1018 // try to clean up receiver_thread_arg
1019 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1020 receiver_thread_arg_t* recv_thread_argp=
1021 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1022 if (recv_thread_argp)
1023 {
1024 if (!recv_thread_argp->terminated)
1025 {
1026 // thread signaled termination, but is not?
1027 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1028
1029 // signal thread for its termination
1030 recv_thread_argp->sig_terminate= true;
1031 // wait for thread to join after termination
1032 pthread_join(thread_id, 0);
1033 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1034 return;
1035 }
1036 }
1037 else
1038 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1039
1040}
1041
1042
1043/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1044 * usually called by the master_thread after the receiver_thread terminated
1045 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1046 * is still valid
1047 */
1048void
1049TPoverUDS::cleanup_receiver_thread(AssocDataUDS* peer_assoc)
1050{
1051 // All operations on recv_thread_argmap and connmap require an already acquired lock
1052 // after this procedure peer_assoc may be invalid because it was erased
1053
1054 // start critical section
1055
1056 if (peer_assoc == 0)
1057 return;
1058
1059 pthread_t thread_id= peer_assoc->thread_ID;
1060
1061 // try to clean up receiver_thread_arg
1062 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1063 receiver_thread_arg_t* recv_thread_argp=
1064 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1065 if (recv_thread_argp)
1066 {
1067 if (!recv_thread_argp->terminated)
1068 {
1069 // thread signaled termination, but is not?
1070 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1071 return;
1072 }
1073 else
1074 { // if thread is already terminated
1075 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1076
1077 // delete it from receiver map
1078 recv_thread_argmap.erase(recv_thread_arg_iter);
1079
1080 // then delete receiver arg structure
1081 delete recv_thread_argp;
1082 }
1083 }
1084
1085 // delete entry from connection map
1086
1087 // cleanup sender thread
1088 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1089 terminate_sender_thread(peer_assoc);
1090
1091 // delete the AssocData structure from the connection map
1092 // also frees allocated AssocData structure
1093 connmap.erase(peer_assoc);
1094
1095 // end critical section
1096
1097 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1098}
1099
1100
1101/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1102 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1103 * is still valid, a lock is also required, because senderthread_queuemap is changed
1104 */
1105void
1106TPoverUDS::terminate_sender_thread(const AssocDataUDS* assoc)
1107{
1108 if (assoc == 0)
1109 {
1110 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1111 return;
1112 }
1113
1114 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1115
1116 if (it != senderthread_queuemap.end())
1117 { // we have a sender thread: send a stop message to it
1118 FastQueue* destqueue= it->second;
1119 if (destqueue)
1120 {
1121 TPoverUDSMsg* internalmsg= new TPoverUDSMsg(assoc,tpparam.source,TPoverUDSMsg::stop);
1122 if (internalmsg)
1123 {
1124 // send the internal message to the sender thread queue
1125 internalmsg->send(tpparam.source,destqueue);
1126 }
1127 }
1128 else
1129 {
1130 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1131 }
1132 // erase entry from map
1133 senderthread_queuemap.erase(it);
1134 }
1135}
1136
1137/* terminate all active threads
1138 * note: locking should not be necessary here because this message is called as last method from
1139 * main_loop()
1140 */
1141void
1142TPoverUDS::terminate_all_threads()
1143{
1144 AssocDataUDS* assoc= 0;
1145 receiver_thread_arg_t* terminate_argp;
1146
1147 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1148 terminate_iterator != recv_thread_argmap.end();
1149 terminate_iterator++)
1150 {
1151 if ( (terminate_argp= terminate_iterator->second) != 0)
1152 {
1153 // we need a non const pointer to erase it later on
1154 assoc= const_cast<AssocDataUDS*>(terminate_argp->peer_assoc);
1155 // check whether thread is still alive
1156 if (terminate_argp->terminated == false)
1157 {
1158 terminate_argp->sig_terminate= true;
1159 // then wait for its termination
1160 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1161 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1162
1163 pthread_join(terminate_iterator->first, 0);
1164
1165 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1166 }
1167 else
1168 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1169 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1170
1171 // cleanup all remaining argument structures of terminated threads
1172 delete terminate_argp;
1173
1174 // terminate any related sender thread that is still running
1175 terminate_sender_thread(assoc);
1176
1177 connmap.erase(assoc);
1178 // delete assoc is not necessary, because connmap.erase() will do the job
1179 }
1180 } // end for
1181}
1182
1183
1184/**
1185 * sender thread starter:
1186 * just a static starter method to allow starting the
1187 * actual sender_thread() method.
1188 *
1189 * @param argp - pointer to the current TPoverUDS object instance and receiver_thread_arg_t struct
1190 */
1191void*
1192TPoverUDS::sender_thread_starter(void *argp)
1193{
1194 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1195
1196 //cout << "invoked sender_thread_Starter" << endl;
1197
1198 // invoke sender thread method
1199 if (sargp != 0 && sargp->instance != 0)
1200 {
1201 // call receiver_thread member function on object instance
1202 sargp->instance->sender_thread(sargp->sender_thread_queue);
1203
1204 //cout << "Before deletion of sarg" << endl;
1205
1206 // no longer needed
1207 delete sargp;
1208 }
1209 else
1210 {
1211 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1212 }
1213 return 0;
1214}
1215
1216
1217
1218
1219/**
1220 * receiver thread starter:
1221 * just a static starter method to allow starting the
1222 * actual receiver_thread() method.
1223 *
1224 * @param argp - pointer to the current TPoverUDS object instance and receiver_thread_arg_t struct
1225 */
1226void*
1227TPoverUDS::receiver_thread_starter(void *argp)
1228{
1229 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1230 // invoke receiver thread method
1231 if (rargp != 0 && rargp->instance != 0)
1232 {
1233 // call receiver_thread member function on object instance
1234 rargp->instance->receiver_thread(rargp->rtargp);
1235
1236 // no longer needed
1237 delete rargp;
1238 }
1239 else
1240 {
1241 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1242 }
1243 return 0;
1244}
1245
1246
1247void
1248TPoverUDS::create_new_sender_thread(FastQueue* senderfqueue)
1249{
1250 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1251
1252 pthread_t senderthreadid;
1253 // create new thread; (arg == 0) is handled by thread, too
1254 int pthread_status= pthread_create(&senderthreadid,
1255 NULL, // NULL: default attributes: thread is joinable and has a
1256 // default, non-realtime scheduling policy
1257 TPoverUDS::sender_thread_starter,
1258 new sender_thread_start_arg_t(this,senderfqueue));
1259 if (pthread_status)
1260 {
1261 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1262
1263 delete senderfqueue;
1264 }
1265}
1266
1267
1268void
1269TPoverUDS::create_new_receiver_thread(AssocDataUDS* peer_assoc)
1270{
1271 receiver_thread_arg_t* argp=
1272 new(nothrow) receiver_thread_arg(peer_assoc);
1273
1274 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1275
1276 // create new thread; (arg == 0) is handled by thread, too
1277 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1278 NULL, // NULL: default attributes: thread is joinable and has a
1279 // default, non-realtime scheduling policy
1280 receiver_thread_starter,
1281 new(nothrow) receiver_thread_start_arg_t(this,argp));
1282 if (pthread_status)
1283 {
1284 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1285
1286 delete argp;
1287 }
1288 else
1289 {
1290 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
1291
1292 // remember pointer to thread arg structure
1293 // thread arg structure should be destroyed after thread termination only
1294 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1295 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1296 if (tmpinsiterator.second == false)
1297 {
1298 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1299 }
1300 unlock(); // uninstall_cleanup(1);
1301 }
1302}
1303
1304
1305/**
1306 * master listener thread starter:
1307 * just a static starter method to allow starting the
1308 * actual master_listener_thread() method.
1309 *
1310 * @param argp - pointer to the current TPoverUDS object instance
1311 */
1312void*
1313TPoverUDS::master_listener_thread_starter(void *argp)
1314{
1315 // invoke listener thread method
1316 if (argp != 0)
1317 {
1318 (static_cast<TPoverUDS*>(argp))->master_listener_thread();
1319 }
1320 return 0;
1321}
1322
1323
1324
1325/**
1326 * master listener thread: waits for incoming connections at the well-known tcp port
1327 * when a connection request is received this thread spawns a receiver_thread for
1328 * receiving packets from the peer at the new socket.
1329 */
1330void
1331TPoverUDS::master_listener_thread()
1332{
1333 //remove any existing socket files
1334 unlink(tpparam.udssocket.c_str());
1335
1336
1337 // create a new address-structure for the listening masterthread
1338 struct sockaddr_un own_address;
1339 own_address.sun_family = AF_UNIX;
1340 strcpy(own_address.sun_path, tpparam.udssocket.c_str());
1341 unlink(tpparam.udssocket.c_str());
1342 uint32 len = strlen(tpparam.udssocket.c_str()) + sizeof(own_address.sun_family);
1343
1344
1345 // create a listening socket
1346 int master_listener_socket= socket(AF_UNIX, SOCK_STREAM, 0);
1347 if (master_listener_socket == -1)
1348 {
1349 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1350 return;
1351 }
1352
1353
1354 // Reuse ports, even if they are busy
1355 int socketreuseflag= 1;
1356 int status= setsockopt(master_listener_socket,
1357 SOL_SOCKET,
1358 SO_REUSEADDR,
1359 (const char *) &socketreuseflag,
1360 sizeof(socketreuseflag));
1361 if (status)
1362 {
1363 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1364 }
1365
1366
1367 // bind the newly created socket to a specific address
1368 int bind_status = bind(master_listener_socket,
1369 reinterpret_cast<struct sockaddr *>(&own_address),
1370 len);
1371 if (bind_status)
1372 {
1373 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " << tpparam.udssocket);
1374 return;
1375 }
1376
1377 // listen at the socket,
1378 // queuesize for pending connections= max_listen_queue_size
1379 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1380 if (listen_status)
1381 {
1382 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1383 << " failed, error: " << strerror(errno));
1384 return;
1385 }
1386 else
1387 {
1388 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at " << tpparam.udssocket << color[off]);
1389 }
1390
1391 // activate O_NON_BLOCK for accept (accept does not block)
1392 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1393
1394 // create a pollfd struct for use in the mainloop
1395 struct pollfd poll_fd;
1396 poll_fd.fd = master_listener_socket;
1397 poll_fd.events = POLLIN | POLLPRI;
1398 poll_fd.revents = 0;
1399 /*
1400 #define POLLIN 0x001 // There is data to read.
1401 #define POLLPRI 0x002 // There is urgent data to read.
1402 #define POLLOUT 0x004 // Writing now will not block.
1403 */
1404
1405 bool terminate = false;
1406 // check for thread terminate condition using get_state()
1407 state_t currstate= get_state();
1408 int poll_status= 0;
1409 const unsigned int number_poll_sockets= 1;
1410 struct sockaddr_un peer_address;
1411 socklen_t peer_address_len;
1412 int conn_socket;
1413
1414 // check whether this thread is signaled for termination
1415 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1416 {
1417 // wait on number_poll_sockets (main drm socket)
1418 // for the events specified above for sleep_time (in ms)
1419 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1420 if (poll_fd.revents & POLLERR) // Error condition
1421 {
1422 if (errno != EINTR)
1423 {
1424 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1425 "Poll caused error " << strerror(errno) << " - indicated by revents");
1426 }
1427 else
1428 {
1429 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1430 }
1431
1432 }
1433 if (poll_fd.revents & POLLHUP) // Hung up
1434 {
1435 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1436 return;
1437 }
1438 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1439 {
1440 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1441 return;
1442 }
1443
1444 switch (poll_status)
1445 {
1446 case -1:
1447 if (errno != EINTR)
1448 {
1449 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1450 }
1451 else
1452 {
1453 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1454 }
1455
1456 break;
1457
1458 case 0:
1459#ifdef DEBUG_HARD
1460 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1461 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1462#endif
1463 currstate= get_state();
1464 continue;
1465 break;
1466
1467 default:
1468#ifdef DEBUG_HARD
1469 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1470#endif
1471 break;
1472 } // end switch
1473
1474 // after a successful accept call,
1475 // accept stores the address information of the connecting party
1476 // in peer_address and the size of its address in addrlen
1477 peer_address_len= sizeof(peer_address);
1478 conn_socket = accept (master_listener_socket,
1479 reinterpret_cast<struct sockaddr *>(&peer_address),
1480 &peer_address_len);
1481 if (conn_socket == -1)
1482 {
1483 if (errno != EWOULDBLOCK && errno != EAGAIN)
1484 {
1485 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1486 << " failed, error: " << strerror(errno));
1487 return;
1488 }
1489 }
1490 else
1491 {
1492 // create a new assocdata-object for the new thread
1493 AssocDataUDS* peer_assoc = NULL;
1494 udsaddress addr(conn_socket);
1495
1496 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr);
1497
1498 //struct sockaddr_un own_address;
1499 //socklen_t own_address_len= sizeof(own_address);
1500 //getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1501
1502 // AssocData will copy addr content into its own structure
1503 // allocated peer_assoc will be stored in connmap
1504 peer_assoc = new(nothrow) AssocDataUDS(conn_socket, addr, udsaddress(conn_socket));
1505
1506 bool insert_success= false;
1507 if (peer_assoc)
1508 {
1509 // start critical section
1510 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
1511 insert_success= connmap.insert(peer_assoc);
1512 // end critical section
1513 unlock(); // uninstall_cleanup(1);
1514 }
1515
1516
1517 if (insert_success == false) // not inserted into connmap
1518 {
1519 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1520 << ", " << addr << " into connection map, aborting connection...");
1521
1522 // abort connection, delete its AssocData
1523 close (conn_socket);
1524 if (peer_assoc)
1525 {
1526 delete peer_assoc;
1527 peer_assoc= 0;
1528 }
1529 return;
1530
1531 } //end __else(connmap.insert());__
1532
1533 // create a new thread for each new connection
1534 create_new_receiver_thread(peer_assoc);
1535 } // end __else (connsocket)__
1536
1537 // get new thread state
1538 currstate= get_state();
1539
1540 } // end while(!terminate)
1541 return;
1542} // end listen_for_connections()
1543
1544
1545TPoverUDS::~TPoverUDS()
1546{
1547 init= false;
1548
1549 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1550
1551 QueueManager::instance()->unregister_queue(tpparam.source);
1552}
1553
1554/** TPoverUDS Thread main loop.
1555 * This loop checks for internal messages of either
1556 * a send() call to start a new receiver thread, or,
1557 * of a receiver_thread() that signals its own termination
1558 * for proper cleanup of control structures.
1559 * It also handles the following internal TPoverUDSMsg types:
1560 * - TPoverUDSMsg::stop - a particular receiver thread is terminated
1561 * - TPoverUDSMsg::start - a particular receiver thread is started
1562 * @param nr number of current thread instance
1563 */
1564void
1565TPoverUDS::main_loop(uint32 nr)
1566{
1567 // get internal queue for messages from receiver_thread
1568 FastQueue* fq = get_fqueue();
1569 if (!fq)
1570 {
1571 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1572 return;
1573 } // end if not fq
1574 // register queue for receiving internal messages from other modules
1575 QueueManager::instance()->register_queue(fq,tpparam.source);
1576
1577 // start master listener thread
1578 pthread_t master_listener_thread_ID;
1579 int pthread_status= pthread_create(&master_listener_thread_ID,
1580 NULL, // NULL: default attributes: thread is joinable and has a
1581 // default, non-realtime scheduling policy
1582 master_listener_thread_starter,
1583 this);
1584 if (pthread_status)
1585 {
1586 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1587 "New master listener thread could not be created: " << strerror(pthread_status));
1588 }
1589 else
1590 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1591
1592
1593 // define max latency for thread reaction on termination/stop signal
1594 timespec wait_interval= { 0, 250000000L }; // 250ms
1595 message* internal_thread_msg = NULL;
1596 state_t currstate= get_state();
1597
1598 // check whether this thread is signaled for termination
1599 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1600 {
1601 // poll internal message queue (blocking)
1602 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1603 {
1604 TPoverUDSMsg* internalmsg= dynamic_cast<TPoverUDSMsg*>(internal_thread_msg);
1605 if (internalmsg)
1606 {
1607 if (internalmsg->get_msgtype() == TPoverUDSMsg::stop)
1608 {
1609 // a receiver thread terminated and signaled for cleanup by master thread
1610 AssocDataUDS* assocd= const_cast<AssocDataUDS*>(internalmsg->get_peer_assoc());
1611 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1612 lock();
1613 cleanup_receiver_thread( assocd );
1614 unlock();
1615 }
1616 else
1617 if (internalmsg->get_msgtype() == TPoverUDSMsg::start)
1618 {
1619 // start a new receiver thread
1620 create_new_receiver_thread( const_cast<AssocDataUDS*>(internalmsg->get_peer_assoc()) );
1621 }
1622 else
1623 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1624
1625 delete internalmsg;
1626 }
1627 else
1628 {
1629 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1630 << internal_thread_msg->get_source());
1631 }
1632 } // endif
1633
1634 // get thread state
1635 currstate= get_state();
1636 } // end while
1637
1638 if (currstate==STATE_STOP)
1639 {
1640 // start abort actions
1641 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1642 } // end if stopped
1643
1644 // do not accept any more messages
1645 fq->shutdown();
1646 // terminate all receiver and sender threads that are still active
1647 terminate_all_threads();
1648}
1649
1650} // end namespace protlib
1651///@}
Note: See TracBrowser for help on using the repository browser.