source: source/ariba/communication/modules/transport/protlib/tp_over_uds.cpp@ 5638

Last change on this file since 5638 was 5638, checked in by Christoph Mayer, 15 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 50.7 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_uds.cpp
3/// transport module for unix domain socket communication
4/// ----------------------------------------------------------
5/// $Id: tp_over_uds.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_uds.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42#include <sys/un.h>
43}
44
45#include <iostream>
46#include <errno.h>
47#include <string>
48#include <sstream>
49
50#include "tp_over_uds.h"
51#include "threadsafe_db.h"
52#include "cleanuphandler.h"
53#include "setuid.h"
54#include "queuemanager.h"
55#include "logfile.h"
56
57#include <set>
58
59#define UDS_SUCCESS 0
60#define UDS_SEND_FAILURE 1
61
62const unsigned int max_listen_queue_size= 10;
63
64namespace protlib {
65
66using namespace log;
67
68/** @defgroup tpuds TP over UDS
69 * @ingroup network
70 * @{
71 */
72
73
74/******* class TPoverUDS *******/
75
76
77/** get_connection_to() checks for already existing connections.
78 * If a connection exists, it returns "AssocData"
79 * and saves it in "connmap" for further use
80 * If no connection exists, a new connection to "addr"
81 * is created.
82 */
83AssocDataUDS*
84TPoverUDS::get_connection_to(udsaddress& addr)
85{
86 // get timeout
87 struct timespec ts;
88 get_time_of_day(ts);
89 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
90 if (ts.tv_nsec>=1000000000L)
91 {
92 ts.tv_sec += ts.tv_nsec / 1000000000L;
93 ts.tv_nsec= ts.tv_nsec % 1000000000L;
94 }
95
96 // create a new AssocData pointer, initialize it to NULL
97 AssocDataUDS* assoc= NULL;
98 int new_socket;
99 // loop until timeout is exceeded: TODO: check applicability of loop
100 do
101 {
102 // check for existing connections to addr
103 // start critical section
104 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
105 assoc= connmap.lookup(addr);
106 // end critical section
107 unlock(); // uninstall_cleanup(1);
108 if (assoc)
109 {
110 // If not shut down then use it, else abort, wait and check again.
111 if (!assoc->shutdown)
112 {
113 return assoc;
114 }
115 else
116 {
117 // TODO: connection is already in shutdown mode. What now?
118 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"socket exists, but is already in mode shutdown");
119
120 return 0;
121 }
122 } //end __if (assoc)__
123 else
124 {
125 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
126 << addr << " found, creating a new one.");
127 }
128
129 // no connection found, create a new one
130 new_socket = socket(AF_UNIX, SOCK_STREAM, 0);
131 if (new_socket == -1)
132 {
133 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"Couldn't create a new socket: " << strerror(errno));
134
135 return 0;
136 }
137
138 // Reuse ports, even if they are busy
139 int socketreuseflag= 1;
140 int status= setsockopt(new_socket,
141 SOL_SOCKET,
142 SO_REUSEADDR,
143 (const char *) &socketreuseflag,
144 sizeof(socketreuseflag));
145 if (status)
146 {
147 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
148 }
149
150 struct sockaddr_un dest_address;
151 dest_address.sun_family= AF_UNIX;
152 strcpy(dest_address.sun_path, addr.get_udssocket().c_str());
153
154 // connect the socket to the destination address
155 int connect_status = connect(new_socket,
156 reinterpret_cast<const struct sockaddr*>(&dest_address),
157 sizeof(dest_address));
158
159 // connects to the listening_port of the peer's masterthread
160 if (connect_status != 0)
161 {
162 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"Connect to " << addr.get_udssocket() << "failed: [" << color[red] << strerror(errno) << color[off] << "]");
163
164 throw TPErrorConnectSetupFail();
165 }
166
167
168 //struct sockaddr_un own_address;
169 //socklen_t own_address_len= sizeof(own_address);
170 //getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
171
172 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr << " from " << udsaddress(new_socket));
173
174 // create new AssocData record (will copy addr)
175 assoc = new(nothrow) AssocDataUDS(new_socket, addr, udsaddress(new_socket));
176
177 // if assoc could be successfully created, insert it into ConnectionMap
178 if (assoc)
179 {
180 bool insert_success= false;
181 // start critical section
182 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
183 // insert AssocData into connection map
184 insert_success= connmap.insert(assoc);
185 // end critical section
186 unlock(); // uninstall_cleanup(1);
187
188 if (insert_success == true)
189 {
190#ifdef _DEBUG
191 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr << " via socket " << new_socket);
192
193
194#endif
195
196 // notify master thread to start a receiver thread (i.e. send selfmessage)
197 TPoverUDSMsg* newmsg= new(nothrow)TPoverUDSMsg(assoc, tpparam.source, TPoverUDSMsg::start);
198 if (newmsg)
199 {
200 newmsg->send_to(tpparam.source);
201 return assoc;
202 }
203 else
204 Log(ERROR_LOG,LOG_CRIT,tpparam.name,"get_connection_to: could not get memory for internal msg");
205 }
206 else
207 {
208 // delete data and abort
209 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr << " into connection map, aborting connection");
210
211 // abort connection, delete its AssocData
212 close (new_socket);
213 if (assoc)
214 {
215 delete assoc;
216 assoc= 0;
217 }
218 return assoc;
219 } // end else connmap.insert
220
221 } // end "if (assoc)"
222 }
223 while (wait_cond(ts)!=ETIMEDOUT);
224
225 return assoc;
226} //end get_connection_to
227
228
229/** terminates a signaling association/connection
230 * - if no connection exists, generate a warning
231 * - otherwise: generate internal msg to related receiver thread
232 */
233void
234TPoverUDS::terminate(const address& in_addr)
235{
236
237 udsaddress* addr = NULL;
238 addr = dynamic_cast<udsaddress*>(in_addr.copy());
239
240#ifndef _NO_LOGGING
241 const char *const thisproc="terminate() - ";
242#endif
243
244 // Create a new AssocData-pointer
245 AssocDataUDS* assoc = NULL;
246
247 // check for existing connections to addr
248
249 // start critical section:
250 // please note if receiver_thread terminated already, the assoc data is not
251 // available anymore therefore we need a lock around cleanup_receiver_thread()
252
253 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
254 assoc= connmap.lookup(*addr);
255 if (assoc)
256 {
257 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
258 // If not shut down then use it, else abort, wait and check again.
259 if (!assoc->shutdown)
260 {
261 if (assoc->socketfd)
262 {
263 // disallow sending
264 if (shutdown(assoc->socketfd,SHUT_WR))
265 {
266 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
267 }
268 else
269 {
270 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
271 }
272 }
273 assoc->shutdown= true;
274 }
275 else
276 Log(EVENT_LOG,LOG_NORMAL,tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
277
278 }
279 else
280 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,thisproc<<"could not find a connection for peer " << addr);
281
282 stop_receiver_thread(assoc);
283
284 // end critical section
285 unlock(); // uninstall_cleanup(1);
286
287 if (addr) delete addr;
288
289}
290
291
292/** generates and internal TPoverUDS message to send a NetMsg to the network
293 *
294 * - it is necessary to let a thread do this, because the caller
295 * may get blocked if the connect() or send() call hangs for a while
296 * - the sending thread will call TPoverUDS::udssend()
297 * - if no connection exists, creates a new one
298 *
299 * @note the netmsg is deleted by the send() method when it is not used anymore
300 */
301void
302TPoverUDS::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
303{
304 if (netmsg == NULL) {
305 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
306 return;
307 }
308
309 udsaddress* addr = NULL;
310 addr = dynamic_cast<udsaddress*>(in_addr.copy());
311
312 if (!addr)
313 {
314 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (udsaddress), has type: " << (int) in_addr.get_type());
315 return;
316 }
317
318
319 // lock due to sendermap access
320 lock();
321
322
323 // check for existing sender thread
324 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
325
326 FastQueue* destqueue= 0;
327
328 if (it == senderthread_queuemap.end())
329 { // no sender thread so far
330
331 if (!use_existing_connection)
332 { // it is allowed to create a new connection for this thread
333
334 // create a new queue for sender thread
335 FastQueue* sender_thread_queue= new FastQueue;
336 create_new_sender_thread(sender_thread_queue);
337 // remember queue for later use
338
339 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
340 senderthread_queuemap.insert( pair<udsaddress,FastQueue*> (*addr,sender_thread_queue) );
341
342 destqueue= sender_thread_queue;
343 }
344 }
345 else
346 { // we have a sender thread, use stored queue from map
347 destqueue= it->second;
348 }
349
350 unlock();
351
352 // send a send_data message to it (if we have a destination queue)
353 if (destqueue)
354 {
355 // both parameters will be freed after message was sent!
356
357 //DLog(tpparam.name, "Sending self-message for socket " << addr->get_udssocket().c_str());
358
359 TPoverUDSMsg* internalmsg= new TPoverUDSMsg(netmsg,addr->copy());
360 if (internalmsg)
361 {
362 //DLog(tpparam.name, "Address in internal message: " << internalmsg->get_appladdr()->get_udssocket());
363 // send the internal message to the sender thread queue
364 internalmsg->send(tpparam.source,destqueue);
365 }
366 }
367 else
368 {
369 if (!use_existing_connection)
370 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
371 else
372 DLog(tpparam.name,"no active sender thread found " << *addr << " - but policy forbids to set up a new connection, will drop data");
373
374 // cannot send data, so we must drop it
375 delete netmsg;
376 }
377
378 if (addr) delete addr;
379}
380
381/** sends a NetMsg to the network.
382 *
383 * @param netmsg - message to send
384 * @param addr - transport endpoint address
385 *
386 * @note if no connection exists, creates a new one
387 * @note both parameters are deleted after the message was sent
388 */
389void
390TPoverUDS::udssend(NetMsg* netmsg, udsaddress* addr)
391{
392#ifndef _NO_LOGGING
393 const char *const thisproc="sender - ";
394#endif
395
396 // set result initially to success, set it to failure
397 // in places where these failures occur
398 int result = UDS_SUCCESS;
399 int ret= 0;
400
401 // Create a new AssocData-pointer
402 AssocDataUDS* assoc = NULL;
403
404 // tp.cpp checks for initialisation of tp and correctness of
405 // msgsize, protocol and ip,
406 // throws an error if something is not right
407 if (addr) {
408 if (!(addr->get_udssocket().size() || addr->get_socknum())) {
409 ERRCLog(tpparam.name, "No UNIX Domain Socket Path / Socket Number in address, will not process it: " << addr->get_udssocket());
410 return;
411 }
412 } else {
413 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "address pointer is NULL");
414 result= UDS_SEND_FAILURE;
415 throw TPErrorInternal();
416 }
417
418
419
420 // check for existing connections,
421 // if a connection exists, return its AssocData
422 // and save it in assoc for further use
423 // if no connection exists, create a new one (in get_connection_to()).
424 // Connection is inserted into connection map that must be done
425 // with exclusive access
426 assoc= get_connection_to(*addr);
427
428 if (assoc==NULL || assoc->socketfd<=0)
429 {
430 Log(ERROR_LOG,LOG_CRIT, tpparam.name, thisproc << "no valid assoc/socket data");
431
432 delete netmsg;
433 delete addr;
434 return;
435 }
436 if (assoc->shutdown)
437 {
438 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
439 delete netmsg;
440 delete addr;
441
442 throw TPErrorSendFailed();
443 }
444
445 uint32 msgsize= netmsg->get_size();
446#ifdef DEBUG_HARD
447 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
448#endif
449
450 // send all the data contained in netmsg to the socket
451 // which belongs to the address "addr"
452 for (uint32 bytes_sent= 0;
453 bytes_sent < msgsize;
454 bytes_sent+= ret)
455 {
456
457#ifdef _DEBUG_HARD
458 for (uint32 i=0;i<msgsize;i++)
459 {
460 cout << "send_buf: " << i << " : ";
461 if ( isalnum(*(netmsg->get_buffer()+i)) )
462 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
463 else
464 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
465 cout << endl;
466 }
467
468 cout << endl;
469 cout << "bytes_sent: " << bytes_sent << endl;
470 cout << "Message size: " << msgsize << endl;
471 cout << "Send-Socket: " << assoc->socketfd << endl;
472 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
473 cout << "vor send " << endl;
474#endif
475
476 // socket send
477 ret= ::send(assoc->socketfd,
478 netmsg->get_buffer() + bytes_sent,
479 msgsize - bytes_sent,
480 MSG_NOSIGNAL);
481
482 //send_buf + bytes_sent
483
484 if (debug_pdu)
485 {
486 ostringstream hexdump;
487 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
488 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
489 }
490
491 if (ret < 0)
492 {
493 result= UDS_SEND_FAILURE;
494 break;
495 } // end if (ret < 0)
496 } // end for
497
498
499 // *** note: netmsg is deleted here ***
500 delete netmsg;
501
502 // Throwing an exception within a critical section does not
503 // unlock the mutex.
504
505 if (result != UDS_SUCCESS)
506 {
507 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "Unix Domain Socket error, returns " << ret << ", error : " << strerror(errno));
508 delete addr;
509
510 throw TPErrorSendFailed();
511
512 // There is no special errorcode for sending failed
513 // in tp.h, there are only these:
514 /*
515 ERROR_BAD_ADDRESS,
516 ERROR_BAD_NETMSG,
517 ERROR_NOT_INIT,
518 ERROR_UNREACHABLE,
519 ERROR_INTERNAL,
520 ERROR_PAYLOAD
521 */
522 }
523 else
524 Log(EVENT_LOG,LOG_NORMAL,tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
525
526 if (!assoc) {
527 // no connection
528
529 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, thisproc << "cannot get connection to " << *addr);
530
531 delete addr;
532
533 throw TPErrorUnreachable(); // should be no assoc found
534 } // end "if (!assoc)"
535
536 // *** delete address ***
537 delete addr;
538}
539
540/* this thread waits for an internal message that either:
541 * - requests transmission of a packet
542 * - requests to stop this thread
543 * @param argp points to the thread queue for internal messages
544 */
545void
546TPoverUDS::sender_thread(void *argp)
547{
548#ifndef _NO_LOGGING
549 const char *const methodname="senderthread - ";
550#endif
551
552 message* internal_thread_msg = NULL;
553
554 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
555
556 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
557 if (!fq)
558 {
559 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
560 return;
561 }
562
563 bool terminate= false;
564 TPoverUDSMsg* internalmsg= 0;
565 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
566 {
567 internalmsg= dynamic_cast<TPoverUDSMsg*>(internal_thread_msg);
568
569 if (internalmsg == 0)
570 {
571 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "received not an TPoverUDSMsg but a" << internal_thread_msg->get_type_name());
572 }
573 else
574 if (internalmsg->get_msgtype() == TPoverUDSMsg::send_data)
575 {
576
577 //DLog(tpparam.name, "Got a send request for socket " << *(internalmsg->get_appladdr()));
578
579 // create a connection if none exists and send the netmsg
580 if (internalmsg->get_netmsg() && internalmsg->get_udsaddr())
581 {
582 udssend(internalmsg->get_netmsg(),internalmsg->get_udsaddr());
583 }
584 else
585 {
586 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "problem with passed arguments references, they point to 0");
587 }
588 }
589 else
590 if (internalmsg->get_msgtype() == TPoverUDSMsg::stop)
591 {
592 terminate= true;
593 }
594 } // end while
595
596 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
597}
598
599
600/** receiver thread listens at a TCP socket for incoming PDUs
601 * and passes complete PDUs to the coordinator. Incomplete
602 * PDUs due to aborted connections or buffer overflows are discarded.
603 * @param argp - assoc data and flags sig_terminate and terminated
604 *
605 * @note this is a static member function, so you cannot use class variables
606 */
607void
608TPoverUDS::receiver_thread(void *argp)
609{
610#ifndef _NO_LOGGING
611 const char *const methodname="receiver - ";
612#endif
613 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
614 const udsaddress* peer_addr = NULL;
615 const udsaddress* own_addr = NULL;
616 uint32 bytes_received = 0;
617 TPMsg* tpmsg= NULL;
618
619 // argument parsing - start
620 if (receiver_thread_argp == 0)
621 {
622 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
623
624 return;
625 }
626 else
627 {
628 // change status to running, i.e., not terminated
629 receiver_thread_argp->terminated= false;
630
631#ifdef _DEBUG
632 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
633#endif
634 }
635
636 int conn_socket= 0;
637 if (receiver_thread_argp->peer_assoc)
638 {
639 // get socket descriptor from arg
640 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
641 // get pointer to peer address of socket (source/sender address of peer) from arg
642 peer_addr= &receiver_thread_argp->peer_assoc->peer;
643 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
644 }
645 else
646 {
647 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer assoc available - pointer is NULL");
648
649 return;
650 }
651
652 if (peer_addr == 0)
653 {
654 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
655
656 return;
657 }
658 // argument parsing - end
659#ifdef _DEBUG
660 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
661 "Preparing to wait for data at socket "
662 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
663#endif
664
665 int ret= 0;
666 uint32 msgcontentlength= 0;
667 bool msgcontentlength_known= false;
668 bool pdu_complete= false; // when to terminate inner loop
669
670 /* maybe use this to create a new pdu,
671 /// constructor
672 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
673 */
674
675 // activate O_NON_BLOCK for recv on socket conn_socket
676 fcntl(conn_socket,F_SETFL, O_NONBLOCK);
677
678 // set options for polling
679 const unsigned int number_poll_sockets= 1;
680 struct pollfd poll_fd;
681 // have to set structure before poll call
682 poll_fd.fd = conn_socket;
683 poll_fd.events = POLLIN | POLLPRI;
684
685 int poll_status;
686 bool recv_error= false;
687
688 NetMsg* netmsg= 0;
689 NetMsg* remainbuf= 0;
690 size_t buffer_bytes_left= 0;
691 size_t trailingbytes= 0;
692 bool skiprecv= false;
693 // loop until we receive a terminate signal (read-only var for this thread)
694 // or get an error from socket read
695 while( receiver_thread_argp->sig_terminate == false )
696 {
697 // Read next PDU from socket or process trailing bytes in remainbuf
698 ret= 0;
699 msgcontentlength= 0;
700 msgcontentlength_known= false;
701 pdu_complete= false;
702 netmsg= 0;
703
704 // there are trailing bytes left from the last receive call
705 if (remainbuf != 0)
706 {
707 netmsg= remainbuf;
708 remainbuf= 0;
709 buffer_bytes_left= netmsg->get_size()-trailingbytes;
710 bytes_received= trailingbytes;
711 trailingbytes= 0;
712 skiprecv= true;
713 }
714 else // no trailing bytes, create a new buffer
715 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
716 {
717 buffer_bytes_left= netmsg->get_size();
718 bytes_received= 0;
719 skiprecv= false;
720 }
721 else
722 { // buffer allocation failed
723 bytes_received= 0;
724 buffer_bytes_left= 0;
725 recv_error= true;
726 }
727
728 // loops until PDU is complete
729 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
730 while (!pdu_complete &&
731 !recv_error &&
732 !receiver_thread_argp->sig_terminate)
733 {
734 if (!skiprecv)
735 {
736 // read from TCP socket or return after sleep_time
737 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
738
739 if (receiver_thread_argp->sig_terminate)
740 {
741 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
742 // disallow sending
743 AssocDataUDS* myassoc=const_cast<AssocDataUDS *>(receiver_thread_argp->peer_assoc);
744 if (myassoc->shutdown == false)
745 {
746 myassoc->shutdown= true;
747 if (shutdown(myassoc->socketfd,SHUT_WR))
748 {
749 if ( errno != ENOTCONN )
750 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
751 }
752 }
753 // try to read do a last read from the TCP socket or return after sleep_time
754 if (poll_status == 0)
755 {
756 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
757 }
758 }
759
760 if (poll_fd.revents & POLLERR) // Error condition
761 {
762 if (errno == 0 || errno == EINTR)
763 {
764 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "poll(): " << strerror(errno));
765 }
766 else
767 {
768 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
769 recv_error= true;
770 }
771 }
772
773 if (poll_fd.revents & POLLHUP) // Hung up
774 {
775 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
776 recv_error= true;
777 }
778
779 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
780 {
781 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll Invalid request: fd not open");
782 recv_error= true;
783 }
784
785 // check status (return value) of poll call
786 switch (poll_status)
787 {
788 case -1:
789 if (errno == 0 || errno == EINTR)
790 {
791 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, methodname << "Poll status: " << strerror(errno));
792 }
793 else
794 {
795 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
796 recv_error= true;
797 }
798
799 continue; // next while iteration
800 break;
801
802 case 0:
803#ifdef DEBUG_HARD
804 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
805#endif
806 continue; // next while iteration
807 break;
808
809 default:
810#ifdef DEBUG_HARD
811 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
812#endif
813 break;
814 } // end switch
815
816
817 /// receive data from socket buffer (recv will not block)
818 ret = recv(conn_socket,
819 netmsg->get_buffer() + bytes_received,
820 buffer_bytes_left,
821 0);
822
823 if ( ret < 0 )
824 {
825 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
826 {
827 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
828 recv_error= true;
829 continue;
830 }
831 else
832 { // errno==EAGAIN || errno==EWOULDBLOCK
833 // just nothing to read from socket, continue w/ next poll
834 continue;
835 }
836 }
837 else
838 {
839 if (ret == 0)
840 {
841 // this means that EOF is reached,
842 // other side has closed connection
843 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
844 // disallow sending
845 AssocDataUDS* myassoc=const_cast<AssocDataUDS *>(receiver_thread_argp->peer_assoc);
846 if (myassoc->shutdown == false)
847 {
848 myassoc->shutdown= true;
849 if (shutdown(myassoc->socketfd,SHUT_WR))
850 {
851 if ( errno != ENOTCONN )
852 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
853 }
854 }
855 // not a real error, but we must quit the receive loop
856 recv_error= true;
857 }
858 else
859 {
860 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
861 // track number of received bytes
862
863 bytes_received+= ret;
864 buffer_bytes_left-= ret;
865 }
866 }
867 } // end if do not skip recv() statement
868
869 if (buffer_bytes_left < 0) ///< buffer space exhausted now
870 {
871 recv_error= true;
872 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
873 }
874
875 if (!msgcontentlength_known) ///< common header not parsed
876 {
877 // enough bytes read to parse common header?
878 if (bytes_received >= common_header_length)
879 {
880 // get message content length in number of bytes
881 if (getmsglength(*netmsg, msgcontentlength)) msgcontentlength_known= true;
882 else
883 {
884 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
885
886 ostringstream hexdumpstr;
887 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
888 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"dumping received bytes:" << hexdumpstr.str());
889
890 // reset all counters
891 msgcontentlength= 0;
892 msgcontentlength_known= false;
893 bytes_received= 0;
894 pdu_complete= false;
895 continue;
896 }
897 }
898 } // endif common header not parsed
899
900 // check whether we have read the whole Protocol PDU
901 //DLog(tpparam.name, "bytes_received - common_header_length: " << bytes_received-common_header_length << "msgcontentlength: " << msgcontentlength << ">=");
902 if (msgcontentlength_known && bytes_received-common_header_length >= msgcontentlength )
903 {
904
905 pdu_complete= true;
906 if (bytes_received-common_header_length > msgcontentlength)
907 {
908 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
909 remainbuf= new NetMsg(NetMsg::max_size);
910 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
911 bytes_received= common_header_length+msgcontentlength;
912 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
913 }
914 }
915 } // end while (!pdu_complete && !recv_error && !signalled for termination)
916 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
917
918 // if other side closed the connection, we should still be able to deliver the remaining data
919 if (ret == 0)
920 {
921 recv_error= false;
922 }
923
924 // deliver only complete PDUs to signaling module
925 if (!recv_error && pdu_complete)
926 {
927 // create TPMsg and send it to the signaling thread
928 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
929
930 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "receipt of PDU now complete, sending to module " << message::get_qaddr_name(tpparam.dest));
931
932 debug_pdu=false;
933
934 if (debug_pdu)
935 {
936 ostringstream hexdump;
937 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
938 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
939 }
940
941 // send the message if it was successfully created
942 // bool message::send_to(qaddr_t dest, bool exp = false);
943 if (!tpmsg
944 || (!tpmsg->get_peeraddress())
945 || (!tpmsg->send_to(tpparam.dest)))
946 {
947 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
948 if (tpmsg) delete tpmsg;
949 } // end if tpmsg not allocated or not addr or not sent
950
951 } // end if !recv_error
952 else
953 { // error during receive or PDU incomplete
954 if (bytes_received>0)
955 {
956 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
957 }
958
959 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
960 {
961 ostringstream hexdumpstr;
962 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
963 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
964 }
965 // leave the outer loop
966 /**********************/
967 break;
968 /**********************/
969 } // end else
970
971 } // end while (thread not signalled for termination)
972
973 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
974 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
975
976 // shutdown socket
977 if (shutdown(conn_socket, SHUT_RD))
978 {
979 if ( errno != ENOTCONN )
980 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
981 }
982
983 // close socket
984 close(conn_socket);
985
986 receiver_thread_argp->terminated= true;
987
988 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
989
990#ifdef _DEBUG
991 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
992#endif
993 // notify master thread for invocation of cleanup procedure
994 TPoverUDSMsg* newmsg= new(nothrow)TPoverUDSMsg(receiver_thread_argp->peer_assoc);
995 // send message to main loop thread
996 newmsg->send_to(tpparam.source);
997}
998
999
1000/** this signals a terminate to a thread and wait for the thread to stop
1001 * @note it is not safe to access any thread related data after this method returned,
1002 * because the receiver thread will initiate a cleanup_receiver_thread() method
1003 * which may erase all relevant thread data.
1004 */
1005void
1006TPoverUDS::stop_receiver_thread(AssocDataUDS* peer_assoc)
1007{
1008 // All operations on recv_thread_argmap and connmap require an already acquired lock
1009 // after this procedure peer_assoc may be invalid because it was erased
1010
1011 // start critical section
1012
1013 if (peer_assoc == 0)
1014 return;
1015
1016 pthread_t thread_id= peer_assoc->thread_ID;
1017
1018 // try to clean up receiver_thread_arg
1019 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1020 receiver_thread_arg_t* recv_thread_argp=
1021 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1022 if (recv_thread_argp)
1023 {
1024 if (!recv_thread_argp->terminated)
1025 {
1026 // thread signaled termination, but is not?
1027 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1028
1029 // signal thread for its termination
1030 recv_thread_argp->sig_terminate= true;
1031 // wait for thread to join after termination
1032 pthread_join(thread_id, 0);
1033 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1034 return;
1035 }
1036 }
1037 else
1038 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1039
1040}
1041
1042
1043/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1044 * usually called by the master_thread after the receiver_thread terminated
1045 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1046 * is still valid
1047 */
1048void
1049TPoverUDS::cleanup_receiver_thread(AssocDataUDS* peer_assoc)
1050{
1051 // All operations on recv_thread_argmap and connmap require an already acquired lock
1052 // after this procedure peer_assoc may be invalid because it was erased
1053
1054 // start critical section
1055
1056 if (peer_assoc == 0)
1057 return;
1058
1059 pthread_t thread_id= peer_assoc->thread_ID;
1060
1061 // try to clean up receiver_thread_arg
1062 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1063 receiver_thread_arg_t* recv_thread_argp=
1064 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1065 if (recv_thread_argp)
1066 {
1067 if (!recv_thread_argp->terminated)
1068 {
1069 // thread signaled termination, but is not?
1070 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1071 return;
1072 }
1073 else
1074 { // if thread is already terminated
1075 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1076
1077 // delete it from receiver map
1078 recv_thread_argmap.erase(recv_thread_arg_iter);
1079
1080 // then delete receiver arg structure
1081 delete recv_thread_argp;
1082 }
1083 }
1084
1085 // delete entry from connection map
1086
1087 // cleanup sender thread
1088 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1089 terminate_sender_thread(peer_assoc);
1090
1091 // delete the AssocData structure from the connection map
1092 // also frees allocated AssocData structure
1093 connmap.erase(peer_assoc);
1094
1095 // end critical section
1096
1097 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1098}
1099
1100
1101/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1102 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1103 * is still valid, a lock is also required, because senderthread_queuemap is changed
1104 */
1105void
1106TPoverUDS::terminate_sender_thread(const AssocDataUDS* assoc)
1107{
1108 if (assoc == 0)
1109 {
1110 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1111 return;
1112 }
1113
1114 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1115
1116 if (it != senderthread_queuemap.end())
1117 { // we have a sender thread: send a stop message to it
1118 FastQueue* destqueue= it->second;
1119 if (destqueue)
1120 {
1121 TPoverUDSMsg* internalmsg= new TPoverUDSMsg(assoc,tpparam.source,TPoverUDSMsg::stop);
1122 if (internalmsg)
1123 {
1124 // send the internal message to the sender thread queue
1125 internalmsg->send(tpparam.source,destqueue);
1126 }
1127 }
1128 else
1129 {
1130 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1131 }
1132 // erase entry from map
1133 senderthread_queuemap.erase(it);
1134 }
1135}
1136
1137/* terminate all active threads
1138 * note: locking should not be necessary here because this message is called as last method from
1139 * main_loop()
1140 */
1141void
1142TPoverUDS::terminate_all_threads()
1143{
1144 AssocDataUDS* assoc= 0;
1145 receiver_thread_arg_t* terminate_argp;
1146
1147 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1148 terminate_iterator != recv_thread_argmap.end();
1149 terminate_iterator++)
1150 {
1151 if ( (terminate_argp= terminate_iterator->second) != 0)
1152 {
1153 // we need a non const pointer to erase it later on
1154 assoc= const_cast<AssocDataUDS*>(terminate_argp->peer_assoc);
1155 // check whether thread is still alive
1156 if (terminate_argp->terminated == false)
1157 {
1158 terminate_argp->sig_terminate= true;
1159 // then wait for its termination
1160 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1161 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1162
1163 pthread_join(terminate_iterator->first, 0);
1164
1165 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1166 }
1167 else
1168 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1169 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1170
1171 // cleanup all remaining argument structures of terminated threads
1172 delete terminate_argp;
1173
1174 // terminate any related sender thread that is still running
1175 terminate_sender_thread(assoc);
1176
1177 connmap.erase(assoc);
1178 // delete assoc is not necessary, because connmap.erase() will do the job
1179 }
1180 } // end for
1181}
1182
1183
1184/**
1185 * sender thread starter:
1186 * just a static starter method to allow starting the
1187 * actual sender_thread() method.
1188 *
1189 * @param argp - pointer to the current TPoverUDS object instance and receiver_thread_arg_t struct
1190 */
1191void*
1192TPoverUDS::sender_thread_starter(void *argp)
1193{
1194 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1195
1196 //cout << "invoked sender_thread_Starter" << endl;
1197
1198 // invoke sender thread method
1199 if (sargp != 0 && sargp->instance != 0)
1200 {
1201 // call receiver_thread member function on object instance
1202 sargp->instance->sender_thread(sargp->sender_thread_queue);
1203
1204 //cout << "Before deletion of sarg" << endl;
1205
1206 // no longer needed
1207 delete sargp;
1208 }
1209 else
1210 {
1211 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1212 }
1213 return 0;
1214}
1215
1216
1217
1218
1219/**
1220 * receiver thread starter:
1221 * just a static starter method to allow starting the
1222 * actual receiver_thread() method.
1223 *
1224 * @param argp - pointer to the current TPoverUDS object instance and receiver_thread_arg_t struct
1225 */
1226void*
1227TPoverUDS::receiver_thread_starter(void *argp)
1228{
1229 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1230 // invoke receiver thread method
1231 if (rargp != 0 && rargp->instance != 0)
1232 {
1233 // call receiver_thread member function on object instance
1234 rargp->instance->receiver_thread(rargp->rtargp);
1235
1236 // no longer needed
1237 delete rargp;
1238 }
1239 else
1240 {
1241 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1242 }
1243 return 0;
1244}
1245
1246
1247void
1248TPoverUDS::create_new_sender_thread(FastQueue* senderfqueue)
1249{
1250 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1251
1252 pthread_t senderthreadid;
1253 // create new thread; (arg == 0) is handled by thread, too
1254 int pthread_status= pthread_create(&senderthreadid,
1255 NULL, // NULL: default attributes: thread is joinable and has a
1256 // default, non-realtime scheduling policy
1257 TPoverUDS::sender_thread_starter,
1258 new sender_thread_start_arg_t(this,senderfqueue));
1259 if (pthread_status)
1260 {
1261 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1262
1263 delete senderfqueue;
1264 }
1265}
1266
1267
1268void
1269TPoverUDS::create_new_receiver_thread(AssocDataUDS* peer_assoc)
1270{
1271 receiver_thread_arg_t* argp=
1272 new(nothrow) receiver_thread_arg(peer_assoc);
1273
1274 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1275
1276 // create new thread; (arg == 0) is handled by thread, too
1277 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1278 NULL, // NULL: default attributes: thread is joinable and has a
1279 // default, non-realtime scheduling policy
1280 receiver_thread_starter,
1281 new(nothrow) receiver_thread_start_arg_t(this,argp));
1282 if (pthread_status)
1283 {
1284 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1285
1286 delete argp;
1287 }
1288 else
1289 {
1290 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
1291
1292 // remember pointer to thread arg structure
1293 // thread arg structure should be destroyed after thread termination only
1294 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1295 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1296 if (tmpinsiterator.second == false)
1297 {
1298 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1299 }
1300 unlock(); // uninstall_cleanup(1);
1301 }
1302}
1303
1304
1305/**
1306 * master listener thread starter:
1307 * just a static starter method to allow starting the
1308 * actual master_listener_thread() method.
1309 *
1310 * @param argp - pointer to the current TPoverUDS object instance
1311 */
1312void*
1313TPoverUDS::master_listener_thread_starter(void *argp)
1314{
1315 // invoke listener thread method
1316 if (argp != 0)
1317 {
1318 (static_cast<TPoverUDS*>(argp))->master_listener_thread();
1319 }
1320 return 0;
1321}
1322
1323
1324
1325/**
1326 * master listener thread: waits for incoming connections at the well-known tcp port
1327 * when a connection request is received this thread spawns a receiver_thread for
1328 * receiving packets from the peer at the new socket.
1329 */
1330void
1331TPoverUDS::master_listener_thread()
1332{
1333 //remove any existing socket files
1334 unlink(tpparam.udssocket.c_str());
1335
1336
1337 // create a new address-structure for the listening masterthread
1338 struct sockaddr_un own_address;
1339 own_address.sun_family = AF_UNIX;
1340 strcpy(own_address.sun_path, tpparam.udssocket.c_str());
1341 unlink(tpparam.udssocket.c_str());
1342 uint32 len = strlen(tpparam.udssocket.c_str()) + sizeof(own_address.sun_family);
1343
1344
1345 // create a listening socket
1346 int master_listener_socket= socket(AF_UNIX, SOCK_STREAM, 0);
1347 if (master_listener_socket == -1)
1348 {
1349 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1350 return;
1351 }
1352
1353
1354 // Reuse ports, even if they are busy
1355 int socketreuseflag= 1;
1356 int status= setsockopt(master_listener_socket,
1357 SOL_SOCKET,
1358 SO_REUSEADDR,
1359 (const char *) &socketreuseflag,
1360 sizeof(socketreuseflag));
1361 if (status)
1362 {
1363 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1364 }
1365
1366
1367 // bind the newly created socket to a specific address
1368 int bind_status = bind(master_listener_socket,
1369 reinterpret_cast<struct sockaddr *>(&own_address),
1370 len);
1371 if (bind_status)
1372 {
1373 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to " << tpparam.udssocket);
1374 return;
1375 }
1376
1377 // listen at the socket,
1378 // queuesize for pending connections= max_listen_queue_size
1379 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1380 if (listen_status)
1381 {
1382 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1383 << " failed, error: " << strerror(errno));
1384 return;
1385 }
1386 else
1387 {
1388 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at " << tpparam.udssocket << color[off]);
1389 }
1390
1391 // activate O_NON_BLOCK for accept (accept does not block)
1392 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1393
1394 // create a pollfd struct for use in the mainloop
1395 struct pollfd poll_fd;
1396 poll_fd.fd = master_listener_socket;
1397 poll_fd.events = POLLIN | POLLPRI;
1398 poll_fd.revents = 0;
1399 /*
1400 #define POLLIN 0x001 // There is data to read.
1401 #define POLLPRI 0x002 // There is urgent data to read.
1402 #define POLLOUT 0x004 // Writing now will not block.
1403 */
1404
1405 bool terminate = false;
1406 // check for thread terminate condition using get_state()
1407 state_t currstate= get_state();
1408 int poll_status= 0;
1409 const unsigned int number_poll_sockets= 1;
1410 struct sockaddr_un peer_address;
1411 socklen_t peer_address_len;
1412 int conn_socket;
1413
1414 // check whether this thread is signaled for termination
1415 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1416 {
1417 // wait on number_poll_sockets (main drm socket)
1418 // for the events specified above for sleep_time (in ms)
1419 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1420 if (poll_fd.revents & POLLERR) // Error condition
1421 {
1422 if (errno != EINTR)
1423 {
1424 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1425 "Poll caused error " << strerror(errno) << " - indicated by revents");
1426 }
1427 else
1428 {
1429 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1430 }
1431
1432 }
1433 if (poll_fd.revents & POLLHUP) // Hung up
1434 {
1435 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1436 return;
1437 }
1438 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1439 {
1440 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1441 return;
1442 }
1443
1444 switch (poll_status)
1445 {
1446 case -1:
1447 if (errno != EINTR)
1448 {
1449 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1450 }
1451 else
1452 {
1453 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1454 }
1455
1456 break;
1457
1458 case 0:
1459#ifdef DEBUG_HARD
1460 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1461 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1462#endif
1463 currstate= get_state();
1464 continue;
1465 break;
1466
1467 default:
1468#ifdef DEBUG_HARD
1469 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1470#endif
1471 break;
1472 } // end switch
1473
1474 // after a successful accept call,
1475 // accept stores the address information of the connecting party
1476 // in peer_address and the size of its address in addrlen
1477 peer_address_len= sizeof(peer_address);
1478 conn_socket = accept (master_listener_socket,
1479 reinterpret_cast<struct sockaddr *>(&peer_address),
1480 &peer_address_len);
1481 if (conn_socket == -1)
1482 {
1483 if (errno != EWOULDBLOCK && errno != EAGAIN)
1484 {
1485 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1486 << " failed, error: " << strerror(errno));
1487 return;
1488 }
1489 }
1490 else
1491 {
1492 // create a new assocdata-object for the new thread
1493 AssocDataUDS* peer_assoc = NULL;
1494 udsaddress addr(conn_socket);
1495
1496 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr);
1497
1498 //struct sockaddr_un own_address;
1499 //socklen_t own_address_len= sizeof(own_address);
1500 //getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1501
1502 // AssocData will copy addr content into its own structure
1503 // allocated peer_assoc will be stored in connmap
1504 peer_assoc = new(nothrow) AssocDataUDS(conn_socket, addr, udsaddress(conn_socket));
1505
1506 bool insert_success= false;
1507 if (peer_assoc)
1508 {
1509 // start critical section
1510 lock(); // install_cleanup_thread_lock(TPoverUDS, this);
1511 insert_success= connmap.insert(peer_assoc);
1512 // end critical section
1513 unlock(); // uninstall_cleanup(1);
1514 }
1515
1516
1517 if (insert_success == false) // not inserted into connmap
1518 {
1519 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1520 << ", " << addr << " into connection map, aborting connection...");
1521
1522 // abort connection, delete its AssocData
1523 close (conn_socket);
1524 if (peer_assoc)
1525 {
1526 delete peer_assoc;
1527 peer_assoc= 0;
1528 }
1529 return;
1530
1531 } //end __else(connmap.insert());__
1532
1533 // create a new thread for each new connection
1534 create_new_receiver_thread(peer_assoc);
1535 } // end __else (connsocket)__
1536
1537 // get new thread state
1538 currstate= get_state();
1539
1540 } // end while(!terminate)
1541 return;
1542} // end listen_for_connections()
1543
1544
1545TPoverUDS::~TPoverUDS()
1546{
1547 init= false;
1548
1549 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1550
1551 QueueManager::instance()->unregister_queue(tpparam.source);
1552}
1553
1554/** TPoverUDS Thread main loop.
1555 * This loop checks for internal messages of either
1556 * a send() call to start a new receiver thread, or,
1557 * of a receiver_thread() that signals its own termination
1558 * for proper cleanup of control structures.
1559 * It also handles the following internal TPoverUDSMsg types:
1560 * - TPoverUDSMsg::stop - a particular receiver thread is terminated
1561 * - TPoverUDSMsg::start - a particular receiver thread is started
1562 * @param nr number of current thread instance
1563 */
1564void
1565TPoverUDS::main_loop(uint32 nr)
1566{
1567 // get internal queue for messages from receiver_thread
1568 FastQueue* fq = get_fqueue();
1569 if (!fq)
1570 {
1571 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1572 return;
1573 } // end if not fq
1574 // register queue for receiving internal messages from other modules
1575 QueueManager::instance()->register_queue(fq,tpparam.source);
1576
1577 // start master listener thread
1578 pthread_t master_listener_thread_ID;
1579 int pthread_status= pthread_create(&master_listener_thread_ID,
1580 NULL, // NULL: default attributes: thread is joinable and has a
1581 // default, non-realtime scheduling policy
1582 master_listener_thread_starter,
1583 this);
1584 if (pthread_status)
1585 {
1586 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1587 "New master listener thread could not be created: " << strerror(pthread_status));
1588 }
1589 else
1590 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1591
1592
1593 // define max latency for thread reaction on termination/stop signal
1594 timespec wait_interval= { 0, 250000000L }; // 250ms
1595 message* internal_thread_msg = NULL;
1596 state_t currstate= get_state();
1597
1598 // check whether this thread is signaled for termination
1599 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1600 {
1601 // poll internal message queue (blocking)
1602 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1603 {
1604 TPoverUDSMsg* internalmsg= dynamic_cast<TPoverUDSMsg*>(internal_thread_msg);
1605 if (internalmsg)
1606 {
1607 if (internalmsg->get_msgtype() == TPoverUDSMsg::stop)
1608 {
1609 // a receiver thread terminated and signaled for cleanup by master thread
1610 AssocDataUDS* assocd= const_cast<AssocDataUDS*>(internalmsg->get_peer_assoc());
1611 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1612 lock();
1613 cleanup_receiver_thread( assocd );
1614 unlock();
1615 }
1616 else
1617 if (internalmsg->get_msgtype() == TPoverUDSMsg::start)
1618 {
1619 // start a new receiver thread
1620 create_new_receiver_thread( const_cast<AssocDataUDS*>(internalmsg->get_peer_assoc()) );
1621 }
1622 else
1623 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1624
1625 delete internalmsg;
1626 }
1627 else
1628 {
1629 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1630 << internal_thread_msg->get_source());
1631 }
1632 } // endif
1633
1634 // get thread state
1635 currstate= get_state();
1636 } // end while
1637
1638 if (currstate==STATE_STOP)
1639 {
1640 // start abort actions
1641 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1642 } // end if stopped
1643
1644 // do not accept any more messages
1645 fq->shutdown();
1646 // terminate all receiver and sender threads that are still active
1647 terminate_all_threads();
1648}
1649
1650} // end namespace protlib
1651///@}
Note: See TracBrowser for help on using the repository browser.