source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.cpp@ 9637

Last change on this file since 9637 was 8609, checked in by Christoph Mayer, 14 years ago

-memleaks

File size: 55.9 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63#define IPV6_ADDR_INT32_SMP 0x0000ffff
64
65namespace protlib {
66
67void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
68 bzero(sin, sizeof(*sin));
69 sin->sin_family = AF_INET;
70 sin->sin_port = sin6->sin6_port;
71 memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
72}
73
74/* Convert sockaddr_in to sockaddr_in6 in v4 mapped addr format. */
75void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
76 bzero(sin6, sizeof(*sin6));
77 sin6->sin6_family = AF_INET6;
78 sin6->sin6_port = sin->sin_port;
79 *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
80 *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
81 *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
82 *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
83}
84
85using namespace log;
86
87/** @defgroup tptcp TP over TCP
88 * @ingroup network
89 * @{
90 */
91
92char in6_addrstr[INET6_ADDRSTRLEN+1];
93
94
95/******* class TPoverTCP *******/
96
97
98/** get_connection_to() checks for already existing connections.
99 * If a connection exists, it returns "AssocData"
100 * and saves it in "connmap" for further use
101 * If no connection exists, a new connection to "addr"
102 * is created.
103 */
104AssocData*
105TPoverTCP::get_connection_to(const appladdress& addr)
106{
107 // get timeout
108 struct timespec ts;
109 get_time_of_day(ts);
110 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
111 if (ts.tv_nsec>=1000000000L)
112 {
113 ts.tv_sec += ts.tv_nsec / 1000000000L;
114 ts.tv_nsec= ts.tv_nsec % 1000000000L;
115 }
116
117 // create a new AssocData pointer, initialize it to NULL
118 AssocData* assoc= NULL;
119 int new_socket;
120 // loop until timeout is exceeded: TODO: check applicability of loop
121 do
122 {
123 // check for existing connections to addr
124 // start critical section
125 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
126 assoc= connmap.lookup(addr);
127 // end critical section
128 unlock(); // uninstall_cleanup(1);
129 if (assoc)
130 {
131 // If not shut down then use it, else abort, wait and check again.
132 if (!assoc->shutdown)
133 {
134 return assoc;
135 }
136 else
137 {
138 // TODO: connection is already in shutdown mode. What now?
139 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
140
141 return 0;
142 }
143 } //end __if (assoc)__
144 else
145 {
146 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
147 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
148 }
149
150 // no connection found, create a new one
151 new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
152 if (new_socket == -1)
153 {
154 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
155
156 return 0;
157 }
158
159 // Disable Nagle Algorithm, set (TCP_NODELAY)
160 int nodelayflag= 1;
161 int status= setsockopt(new_socket,
162 IPPROTO_TCP,
163 TCP_NODELAY,
164 &nodelayflag,
165 sizeof(nodelayflag));
166 if (status)
167 {
168 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
169 }
170
171 // Reuse ports, even if they are busy
172 int socketreuseflag= 1;
173 status= setsockopt(new_socket,
174 SOL_SOCKET,
175 SO_REUSEADDR,
176 (const char *) &socketreuseflag,
177 sizeof(socketreuseflag));
178 if (status)
179 {
180 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
181 }
182
183 struct sockaddr_in6 dest_address;
184 dest_address.sin6_flowinfo= 0;
185 dest_address.sin6_scope_id= 0;
186 addr.get_sockaddr(dest_address);
187
188 // connect the socket to the destination address
189 int connect_status = 0;
190 if (v4_mode) {
191 struct sockaddr_in dest_address_v4;
192 v6_to_v4( &dest_address_v4, &dest_address );
193 connect_status = connect(new_socket,
194 reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
195 sizeof(dest_address));
196 } else {
197 connect_status = connect(new_socket,
198 reinterpret_cast<const struct sockaddr*>(&dest_address),
199 sizeof(dest_address));
200 }
201
202 // connects to the listening_port of the peer's masterthread
203 if (connect_status != 0)
204 {
205 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
206 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
207
208 return 0; // error: couldn't connect
209 }
210
211
212 struct sockaddr_in6 own_address;
213 if (v4_mode) {
214 struct sockaddr_in own_address_v4;
215 socklen_t own_address_len_v4 = sizeof(own_address_v4);
216 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
217 v4_to_v6(&own_address_v4, &own_address);
218 } else {
219 socklen_t own_address_len= sizeof(own_address);
220 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
221 }
222
223 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
224 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
225 << " port #" << ntohs(own_address.sin6_port));
226
227
228 // create new AssocData record (will copy addr)
229 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
230
231 // if assoc could be successfully created, insert it into ConnectionMap
232 if (assoc)
233 {
234 bool insert_success= false;
235 // start critical section
236 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
237 // insert AssocData into connection map
238 insert_success= connmap.insert(assoc);
239 // end critical section
240 unlock(); // uninstall_cleanup(1);
241
242 if (insert_success == true)
243 {
244#ifdef _DEBUG
245 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
246 << " via socket " << new_socket);
247
248
249#endif
250
251 // notify master thread to start a receiver thread (i.e. send selfmessage)
252 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
253 if (newmsg)
254 {
255 bool ret = newmsg->send_to(tpparam.source);
256 if(!ret) delete newmsg;
257 return assoc;
258 }
259 else
260 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
261 }
262 else
263 {
264 // delete data and abort
265 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
266 <<", port #" << addr.get_port() << " into connection map, aborting connection");
267
268 // abort connection, delete its AssocData
269 close (new_socket);
270 if (assoc)
271 {
272 delete assoc;
273 assoc= 0;
274 }
275 return assoc;
276 } // end else connmap.insert
277
278 } // end "if (assoc)"
279 }
280 while (wait_cond(ts)!=ETIMEDOUT);
281
282 return assoc;
283} //end get_connection_to
284
285
286/** terminates a signaling association/connection
287 * - if no connection exists, generate a warning
288 * - otherwise: generate internal msg to related receiver thread
289 */
290void
291TPoverTCP::terminate(const address& in_addr)
292{
293#ifndef _NO_LOGGING
294 const char *const thisproc="terminate() - ";
295#endif
296
297 appladdress* addr = NULL;
298 addr = dynamic_cast<appladdress*>(in_addr.copy());
299
300 if (!addr) return;
301
302 // Create a new AssocData-pointer
303 AssocData* assoc = NULL;
304
305 // check for existing connections to addr
306
307 // start critical section:
308 // please note if receiver_thread terminated already, the assoc data is not
309 // available anymore therefore we need a lock around cleanup_receiver_thread()
310
311 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
312 assoc= connmap.lookup(*addr);
313 if (assoc)
314 {
315 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
316 // If not shut down then use it, else abort, wait and check again.
317 if (!assoc->shutdown)
318 {
319 if (assoc->socketfd)
320 {
321 // disallow sending
322 if (shutdown(assoc->socketfd,SHUT_WR))
323 {
324 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
325 }
326 else
327 {
328 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
329 }
330 }
331 assoc->shutdown= true;
332 }
333 else
334 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
335
336 }
337 else
338 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
339
340 stop_receiver_thread(assoc);
341
342 // end critical section
343 unlock(); // uninstall_cleanup(1);
344
345 if (addr) delete addr;
346}
347
348
349/** generates and internal TPoverTCP message to send a NetMsg to the network
350 * - it is necessary to let a thread do this, because the caller
351 * may get blocked if the connect() or send() call hangs for a while
352 * - the sending thread will call TPoverTCP::tcpsend()
353 * - if no connection exists, creates a new one (unless use_existing_connection is true)
354 * @note the netmsg is deleted by the send() method when it is not used anymore
355 */
356void
357TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
358{
359 if (netmsg == NULL) {
360 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
361 return;
362 }
363
364 appladdress* addr = NULL;
365 addr= dynamic_cast<appladdress*>(in_addr.copy());
366
367 if (!addr)
368 {
369 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
370 return;
371 }
372
373 // lock due to sendermap access
374 lock();
375
376 // check for existing sender thread
377 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
378
379 FastQueue* destqueue= 0;
380
381 if (it == senderthread_queuemap.end())
382 { // no sender thread found so far
383
384 // if we already have an existing connection it is save to create a sender thread
385 // since get_connection_to() will not be invoked, so an existing connection will
386 // be used
387 const AssocData* assoc = connmap.lookup(*addr);
388
389 //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
390
391 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
392 { // it is allowed to create a new connection for this thread
393 // create a new queue for sender thread
394 FastQueue* sender_thread_queue= new FastQueue;
395 create_new_sender_thread(sender_thread_queue);
396 // remember queue for later use
397
398 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
399 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
400
401 destqueue= sender_thread_queue;
402 }
403 }
404 else
405 { // we have a sender thread, use stored queue from map
406 destqueue= it->second;
407 }
408
409 unlock();
410
411 // send a send_data message to it (if we have a destination queue)
412 if (destqueue)
413 {
414 // both parameters will be freed after message was sent!
415
416 appladdress* apl=new appladdress(*addr);
417 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,apl);
418 if (internalmsg)
419 {
420 // send the internal message to the sender thread queue
421 bool sent = internalmsg->send(tpparam.source,destqueue);
422 if (!sent) {
423 delete internalmsg->get_appladdr();
424 delete internalmsg;
425 internalmsg = NULL;
426 }
427 }
428 }
429 else
430 {
431 if (!use_existing_connection)
432 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
433 else
434 {
435 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
436
437 }
438 // cannot send data, so we must drop it
439 delete netmsg;
440 }
441
442 if (addr) delete addr;
443}
444
445/** sends a NetMsg to the network.
446 *
447 * @param netmsg message to send
448 * @param addr transport endpoint address
449 *
450 * @note if no connection exists, creates a new one
451 * @note both parameters are deleted after the message was sent
452 */
453void
454TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
455{
456#ifndef _NO_LOGGING
457 const char *const thisproc="sender - ";
458#endif
459
460 // set result initially to success, set it to failure
461 // in places where these failures occur
462 int result = TCP_SUCCESS;
463 int saved_errno= 0;
464 int ret= 0;
465
466 // Create a new AssocData-pointer
467 AssocData* assoc = NULL;
468
469 // tp.cpp checks for initialisation of tp and correctness of
470 // msgsize, protocol and ip,
471 // throws an error if something is not right
472 if (addr) {
473 addr->convert_to_ipv6();
474 check_send_args(*netmsg,*addr);
475 }
476 else
477 {
478 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
479 result= TCP_SEND_FAILURE;
480
481 delete netmsg;
482 delete addr;
483
484 throw TPErrorInternal();
485 }
486
487
488 // check for existing connections,
489 // if a connection exists, return its AssocData
490 // and save it in assoc for further use
491 // if no connection exists, create a new one (in get_connection_to()).
492 // Connection is inserted into connection map that must be done
493 // with exclusive access
494 assoc= get_connection_to(*addr);
495
496 if (assoc==NULL || assoc->socketfd<=0)
497 {
498 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
499
500 delete netmsg;
501 delete addr;
502 return;
503 }
504
505 if (assoc->shutdown)
506 {
507 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
508 delete netmsg;
509 delete addr;
510
511 throw TPErrorSendFailed();
512 }
513
514 uint32 msgsize= netmsg->get_size();
515#ifdef DEBUG_HARD
516 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
517#endif
518
519 const uint32 retry_send_max = 3;
520 uint32 retry_count = 0;
521 // send all the data contained in netmsg to the socket
522 // which belongs to the address "addr"
523 for (uint32 bytes_sent= 0;
524 bytes_sent < msgsize;
525 bytes_sent+= ret)
526 {
527
528#ifdef _DEBUG_HARD
529 for (uint32 i=0;i<msgsize;i++)
530 {
531 cout << "send_buf: " << i << " : ";
532 if ( isalnum(*(netmsg->get_buffer()+i)) )
533 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
534 else
535 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
536 cout << endl;
537 }
538
539 cout << endl;
540 cout << "bytes_sent: " << bytes_sent << endl;
541 cout << "Message size: " << msgsize << endl;
542 cout << "Send-Socket: " << assoc->socketfd << endl;
543 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
544 cout << "vor send " << endl;
545#endif
546
547 retry_count= 0;
548 do
549 {
550 // socket send
551 ret= ::send(assoc->socketfd,
552 netmsg->get_buffer() + bytes_sent,
553 msgsize - bytes_sent,
554 MSG_NOSIGNAL);
555
556 // send_buf + bytes_sent
557
558 // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
559 // retry sending
560 if (ret < 0)
561 { // internal error during send occured
562 saved_errno= errno;
563 switch(saved_errno)
564 {
565 case EAGAIN: // same as EWOULDBLOCK
566 case EINTR: // man page says: A signal occurred before any data was transmitted.
567 case ENOBUFS: // The output queue for a network interface was full.
568 retry_count++;
569 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
570 << " - retry sending, retry #" << retry_count);
571 // pace down a little bit, sleep for a while
572 sleep(1);
573 break;
574
575 // everything else should not lead to repetition
576 default:
577 retry_count= retry_send_max;
578 break;
579 } // end switch
580 } // endif
581 else // leave while
582 break;
583 }
584 while(retry_count < retry_send_max);
585
586 if (ret < 0)
587 { // unrecoverable error occured
588
589 result= TCP_SEND_FAILURE;
590 break;
591 } // end if (ret < 0)
592 else
593 {
594 if (debug_pdu)
595 {
596 ostringstream hexdump;
597 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
598 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
599 }
600 }
601
602 // continue with send next data block in next for() iteration
603 } // end for
604
605 // *** note: netmsg is deleted here ***
606 delete netmsg;
607
608 // Throwing an exception within a critical section does not
609 // unlock the mutex.
610
611 if (result != TCP_SUCCESS)
612 {
613 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
614 delete addr;
615
616 throw TPErrorSendFailed(saved_errno);
617
618 }
619 else
620 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
621
622 if (!assoc) {
623 // no connection
624
625 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
626 << ", port #" << addr->get_port());
627
628 delete addr;
629
630 throw TPErrorUnreachable(); // should be no assoc found
631 } // end "if (!assoc)"
632
633 // *** delete address ***
634 delete addr;
635}
636
637/* this thread waits for an internal message that either:
638 * - requests transmission of a packet
639 * - requests to stop this thread
640 * @param argp points to the thread queue for internal messages
641 */
642void
643TPoverTCP::sender_thread(void *argp)
644{
645#ifndef _NO_LOGGING
646 const char *const methodname="senderthread - ";
647#endif
648
649 message* internal_thread_msg = NULL;
650
651 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
652
653 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
654 if (!fq)
655 {
656 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
657 return;
658 }
659
660 bool terminate= false;
661 TPoverTCPMsg* internalmsg= 0;
662 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
663 {
664 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
665
666 if (internalmsg == 0)
667 {
668 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
669 }
670 else
671 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
672 {
673 // create a connection if none exists and send the netmsg
674 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
675 {
676 try
677 {
678 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
679 } // end try
680 catch(TPErrorSendFailed& err)
681 {
682 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
683 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
684 } // end catch
685 catch(TPError& err)
686 {
687 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
688 } // end catch
689 catch(...)
690 {
691 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
692 }
693 }
694 else
695 {
696 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
697 }
698 }
699 else
700 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
701 {
702 terminate= true;
703 }
704
705 delete internalmsg;
706 } // end while
707
708 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
709}
710
711
712/** receiver thread listens at a TCP socket for incoming PDUs
713 * and passes complete PDUs to the coordinator. Incomplete
714 * PDUs due to aborted connections or buffer overflows are discarded.
715 * @param argp - assoc data and flags sig_terminate and terminated
716 *
717 * @note this is a static member function, so you cannot use class variables
718 */
719void
720TPoverTCP::receiver_thread(void *argp)
721{
722#ifndef _NO_LOGGING
723 const char *const methodname="receiver - ";
724#endif
725
726 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
727 const appladdress* peer_addr = NULL;
728 const appladdress* own_addr = NULL;
729 uint32 bytes_received = 0;
730 TPMsg* tpmsg= NULL;
731
732 // argument parsing - start
733 if (receiver_thread_argp == 0)
734 {
735 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
736
737 return;
738 }
739 else
740 {
741 // change status to running, i.e., not terminated
742 receiver_thread_argp->terminated= false;
743
744#ifdef _DEBUG
745 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
746#endif
747 }
748
749 int conn_socket= 0;
750 if (receiver_thread_argp->peer_assoc)
751 {
752 // get socket descriptor from arg
753 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
754 // get pointer to peer address of socket (source/sender address of peer) from arg
755 peer_addr= &receiver_thread_argp->peer_assoc->peer;
756 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
757 }
758 else
759 {
760 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
761
762 return;
763 }
764
765 if (peer_addr == 0)
766 {
767 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
768
769 return;
770 }
771 // argument parsing - end
772#ifdef _DEBUG
773 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
774 "Preparing to wait for data at socket "
775 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
776#endif
777
778 int ret= 0;
779 uint32 msgcontentlength= 0;
780 bool msgcontentlength_known= false;
781 bool pdu_complete= false; // when to terminate inner loop
782
783 /* maybe use this to create a new pdu,
784 /// constructor
785 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
786 */
787
788 // activate O_NON_BLOCK for recv on socket conn_socket
789 // CAVEAT: this also implies non-blocking send()!
790 //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
791
792 // set options for polling
793 const unsigned int number_poll_sockets= 1;
794 struct pollfd poll_fd;
795 // have to set structure before poll call
796 poll_fd.fd = conn_socket;
797 poll_fd.events = POLLIN | POLLPRI;
798 poll_fd.revents = 0;
799
800 int poll_status;
801 bool recv_error= false;
802
803 NetMsg* netmsg= 0;
804 NetMsg* remainbuf= 0;
805 size_t buffer_bytes_left= 0;
806 size_t trailingbytes= 0;
807 bool skiprecv= false;
808 // loop until we receive a terminate signal (read-only var for this thread)
809 // or get an error from socket read
810 while( receiver_thread_argp->sig_terminate == false )
811 {
812 // Read next PDU from socket or process trailing bytes in remainbuf
813 ret= 0;
814 msgcontentlength= 0;
815 msgcontentlength_known= false;
816 pdu_complete= false;
817 netmsg= 0;
818
819 // there are trailing bytes left from the last receive call
820 if (remainbuf != 0)
821 {
822 netmsg= remainbuf;
823 remainbuf= 0;
824 buffer_bytes_left= netmsg->get_size()-trailingbytes;
825 bytes_received= trailingbytes;
826 trailingbytes= 0;
827 skiprecv= true;
828 }
829 else // no trailing bytes, create a new buffer
830 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
831 {
832 buffer_bytes_left= netmsg->get_size();
833 bytes_received= 0;
834 skiprecv= false;
835 }
836 else
837 { // buffer allocation failed
838 bytes_received= 0;
839 buffer_bytes_left= 0;
840 recv_error= true;
841 }
842
843 // loops until PDU is complete
844 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
845 while (!pdu_complete &&
846 !recv_error &&
847 !receiver_thread_argp->sig_terminate)
848 {
849 if (!skiprecv)
850 {
851 // read from TCP socket or return after sleep_time
852 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
853
854 if (receiver_thread_argp->sig_terminate)
855 {
856 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
857 // disallow sending
858 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
859 if (myassoc->shutdown == false)
860 {
861 myassoc->shutdown= true;
862 if (shutdown(myassoc->socketfd,SHUT_WR))
863 {
864 if ( errno != ENOTCONN )
865 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
866 }
867 }
868 // try to read do a last read from the TCP socket or return after sleep_time
869 if (poll_status == 0)
870 {
871 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
872 }
873 }
874
875 if (poll_fd.revents & POLLERR) // Error condition
876 {
877 if (errno == 0 || errno == EINTR)
878 {
879 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
880 }
881 else
882 {
883 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
884 recv_error= true;
885 }
886 }
887
888 if (poll_fd.revents & POLLHUP) // Hung up
889 {
890 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
891 recv_error= true;
892 }
893
894 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
895 {
896 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
897 recv_error= true;
898 }
899
900 // check status (return value) of poll call
901 switch (poll_status)
902 {
903 case -1:
904 if (errno == 0 || errno == EINTR)
905 {
906 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
907 }
908 else
909 {
910 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
911 recv_error= true;
912 }
913
914 continue; // next while iteration
915 break;
916
917 case 0:
918#ifdef DEBUG_HARD
919 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
920#endif
921 continue; // next while iteration
922 break;
923
924 default:
925#ifdef DEBUG_HARD
926 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
927#endif
928 break;
929 } // end switch
930
931
932 /// receive data from socket buffer (recv will not block)
933 ret = recv(conn_socket,
934 netmsg->get_buffer() + bytes_received,
935 buffer_bytes_left,
936 MSG_DONTWAIT);
937
938 if ( ret < 0 )
939 {
940 delete netmsg;
941 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
942 {
943 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
944 recv_error= true;
945 continue;
946 }
947 else
948 { // errno==EAGAIN || errno==EWOULDBLOCK
949 // just nothing to read from socket, continue w/ next poll
950 continue;
951 }
952 }
953 else
954 {
955 if (ret == 0)
956 {
957 // this means that EOF is reached,
958 // other side has closed connection
959 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
960 // disallow sending
961 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
962 if (myassoc->shutdown == false)
963 {
964 myassoc->shutdown= true;
965 if (shutdown(myassoc->socketfd,SHUT_WR))
966 {
967 if ( errno != ENOTCONN )
968 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
969 }
970 }
971 // not a real error, but we must quit the receive loop
972 recv_error= true;
973 }
974 else
975 {
976
977 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
978 // track number of received bytes
979 bytes_received+= ret;
980 buffer_bytes_left-= ret;
981 }
982 }
983 } // end if do not skip recv() statement
984
985 if (buffer_bytes_left < 0) ///< buffer space exhausted now
986 {
987 recv_error= true;
988 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
989 }
990
991 if (!msgcontentlength_known) ///< common header not parsed
992 {
993 // enough bytes read to parse common header?
994 if (bytes_received >= common_header_length)
995 {
996 // get message content length in number of bytes
997 if (getmsglength(*netmsg, msgcontentlength))
998 msgcontentlength_known= true;
999 else
1000 {
1001 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
1002
1003 ostringstream hexdumpstr;
1004 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1005 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
1006
1007 // reset all counters
1008 msgcontentlength= 0;
1009 msgcontentlength_known= false;
1010 bytes_received= 0;
1011 pdu_complete= false;
1012 continue;
1013 }
1014 }
1015 } // endif common header not parsed
1016
1017 // check whether we have read the whole Protocol PDU
1018 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
1019 if (msgcontentlength_known)
1020 {
1021 if (bytes_received-common_header_length >= msgcontentlength )
1022 {
1023 pdu_complete= true;
1024 // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1025 netmsg->truncate(common_header_length+msgcontentlength);
1026
1027 // trailing bytes are copied into new buffer
1028 if (bytes_received-common_header_length > msgcontentlength)
1029 {
1030 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1031 remainbuf= new NetMsg(NetMsg::max_size);
1032 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1033 bytes_received= common_header_length+msgcontentlength;
1034 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1035 }
1036 }
1037 else
1038 { // not enough bytes in the buffer must continue to read from network
1039 skiprecv= false;
1040 }
1041 } // endif msgcontentlength_known
1042 } // end while (!pdu_complete && !recv_error && !signalled for termination)
1043 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1044
1045 // if other side closed the connection, we should still be able to deliver the remaining data
1046 if (ret == 0)
1047 {
1048 recv_error= false;
1049 }
1050
1051 // deliver only complete PDUs to signaling module
1052 if (!recv_error && pdu_complete)
1053 {
1054 // create TPMsg and send it to the signaling thread
1055 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1056 if (tpmsg)
1057 {
1058 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1059 << " to module " << message::get_qaddr_name(tpparam.dest));
1060 }
1061
1062 debug_pdu=false;
1063
1064 if (debug_pdu)
1065 {
1066 ostringstream hexdump;
1067 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1068 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1069 }
1070
1071 // send the message if it was successfully created
1072 // bool message::send_to(qaddr_t dest, bool exp = false);
1073 if (!tpmsg
1074 || (!tpmsg->get_peeraddress())
1075 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
1076 {
1077 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1078 if (tpmsg) delete tpmsg;
1079 } // end if tpmsg not allocated or not addr or not sent
1080
1081
1082 } // end if !recv_error
1083 else
1084 { // error during receive or PDU incomplete
1085 if (bytes_received>0)
1086 {
1087 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1088 }
1089
1090 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1091 {
1092 ostringstream hexdumpstr;
1093 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1094 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
1095 }
1096 // leave the outer loop
1097 /**********************/
1098 break;
1099 /**********************/
1100 } // end else
1101
1102 } // end while (thread not signalled for termination)
1103
1104 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
1105 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1106
1107 // shutdown socket
1108 if (shutdown(conn_socket, SHUT_RD))
1109 {
1110 if ( errno != ENOTCONN )
1111 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1112 }
1113
1114 // close socket
1115 close(conn_socket);
1116
1117 receiver_thread_argp->terminated= true;
1118 //delete netmsg;
1119
1120 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1121
1122#ifdef _DEBUG
1123 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1124#endif
1125 // notify master thread for invocation of cleanup procedure
1126 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1127 // send message to main loop thread
1128 newmsg->send_to(tpparam.source);
1129
1130}
1131
1132
1133/** this signals a terminate to a thread and wait for the thread to stop
1134 * @note it is not safe to access any thread related data after this method returned,
1135 * because the receiver thread will initiate a cleanup_receiver_thread() method
1136 * which may erase all relevant thread data.
1137 */
1138void
1139TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1140{
1141 // All operations on recv_thread_argmap and connmap require an already acquired lock
1142 // after this procedure peer_assoc may be invalid because it was erased
1143
1144 // start critical section
1145
1146 if (peer_assoc == 0)
1147 return;
1148
1149 pthread_t thread_id= peer_assoc->thread_ID;
1150
1151 // try to clean up receiver_thread_arg
1152 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1153 receiver_thread_arg_t* recv_thread_argp=
1154 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1155 if (recv_thread_argp)
1156 {
1157 if (!recv_thread_argp->terminated)
1158 {
1159 // thread signaled termination, but is not?
1160 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1161
1162 // signal thread for its termination
1163 recv_thread_argp->sig_terminate= true;
1164 // wait for thread to join after termination
1165 pthread_join(thread_id, 0);
1166 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1167 return;
1168 }
1169 }
1170 else
1171 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1172
1173}
1174
1175
1176/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1177 * usually called by the master_thread after the receiver_thread terminated
1178 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1179 * is still valid
1180 */
1181void
1182TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1183{
1184 // All operations on recv_thread_argmap and connmap require an already acquired lock
1185 // after this procedure peer_assoc may be invalid because it was erased
1186
1187 // start critical section
1188
1189 if (peer_assoc == 0)
1190 return;
1191
1192 pthread_t thread_id= peer_assoc->thread_ID;
1193
1194 // try to clean up receiver_thread_arg
1195 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1196 receiver_thread_arg_t* recv_thread_argp=
1197 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1198 if (recv_thread_argp)
1199 {
1200 if (!recv_thread_argp->terminated)
1201 {
1202 // thread signaled termination, but is not?
1203 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1204 return;
1205 }
1206 else
1207 { // if thread is already terminated
1208 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1209
1210 // delete it from receiver map
1211 recv_thread_argmap.erase(recv_thread_arg_iter);
1212
1213 // then delete receiver arg structure
1214 delete recv_thread_argp;
1215 }
1216 }
1217
1218 // delete entry from connection map
1219
1220 // cleanup sender thread
1221 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1222 terminate_sender_thread(peer_assoc);
1223
1224 // delete the AssocData structure from the connection map
1225 // also frees allocated AssocData structure
1226 connmap.erase(peer_assoc);
1227
1228 // end critical section
1229
1230 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1231}
1232
1233
1234/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1235 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1236 * is still valid, a lock is also required, because senderthread_queuemap is changed
1237 */
1238void
1239TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1240{
1241 if (assoc == 0)
1242 {
1243 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1244 return;
1245 }
1246
1247 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1248
1249 if (it != senderthread_queuemap.end())
1250 { // we have a sender thread: send a stop message to it
1251 FastQueue* destqueue= it->second;
1252 if (destqueue)
1253 {
1254 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1255 if (internalmsg)
1256 {
1257 // send the internal message to the sender thread queue
1258 internalmsg->send(tpparam.source,destqueue);
1259 }
1260 }
1261 else
1262 {
1263 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1264 }
1265 // erase entry from map
1266 senderthread_queuemap.erase(it);
1267 }
1268}
1269
1270/* terminate all active threads
1271 * note: locking should not be necessary here because this message is called as last method from
1272 * main_loop()
1273 */
1274void
1275TPoverTCP::terminate_all_threads()
1276{
1277 AssocData* assoc= 0;
1278 receiver_thread_arg_t* terminate_argp;
1279
1280 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1281 terminate_iterator != recv_thread_argmap.end();
1282 terminate_iterator++)
1283 {
1284 if ( (terminate_argp= terminate_iterator->second) != 0)
1285 {
1286 // we need a non const pointer to erase it later on
1287 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1288 // check whether thread is still alive
1289 if (terminate_argp->terminated == false)
1290 {
1291 terminate_argp->sig_terminate= true;
1292 // then wait for its termination
1293 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1294 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1295
1296 pthread_join(terminate_iterator->first, 0);
1297
1298 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1299 }
1300 else
1301 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1302 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1303
1304 // cleanup all remaining argument structures of terminated threads
1305 delete terminate_argp;
1306
1307 // terminate any related sender thread that is still running
1308 terminate_sender_thread(assoc);
1309
1310 connmap.erase(assoc);
1311 // delete assoc is not necessary, because connmap.erase() will do the job
1312 }
1313 } // end for
1314}
1315
1316
1317/**
1318 * sender thread starter:
1319 * just a static starter method to allow starting the
1320 * actual sender_thread() method.
1321 *
1322 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1323 */
1324void*
1325TPoverTCP::sender_thread_starter(void *argp)
1326{
1327 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1328
1329 //cout << "invoked sender_thread_Starter" << endl;
1330
1331 // invoke sender thread method
1332 if (sargp != 0 && sargp->instance != 0)
1333 {
1334 // call receiver_thread member function on object instance
1335 sargp->instance->sender_thread(sargp->sender_thread_queue);
1336
1337 //cout << "Before deletion of sarg" << endl;
1338
1339 // no longer needed
1340 delete sargp;
1341 }
1342 else
1343 {
1344 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1345 }
1346 return 0;
1347}
1348
1349
1350
1351
1352/**
1353 * receiver thread starter:
1354 * just a static starter method to allow starting the
1355 * actual receiver_thread() method.
1356 *
1357 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1358 */
1359void*
1360TPoverTCP::receiver_thread_starter(void *argp)
1361{
1362 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1363 // invoke receiver thread method
1364 if (rargp != 0 && rargp->instance != 0)
1365 {
1366 // call receiver_thread member function on object instance
1367 rargp->instance->receiver_thread(rargp->rtargp);
1368
1369 // no longer needed
1370 delete rargp;
1371 }
1372 else
1373 {
1374 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1375 }
1376 return 0;
1377}
1378
1379
1380void
1381TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1382{
1383 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1384
1385 pthread_t senderthreadid;
1386 // create new thread; (arg == 0) is handled by thread, too
1387 int pthread_status= pthread_create(&senderthreadid,
1388 NULL, // NULL: default attributes: thread is joinable and has a
1389 // default, non-realtime scheduling policy
1390 TPoverTCP::sender_thread_starter,
1391 new sender_thread_start_arg_t(this,senderfqueue));
1392 if (pthread_status)
1393 {
1394 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1395
1396 delete senderfqueue;
1397 }
1398}
1399
1400
1401void
1402TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1403{
1404 receiver_thread_arg_t* argp=
1405 new(nothrow) receiver_thread_arg(peer_assoc);
1406
1407 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1408
1409 // create new thread; (arg == 0) is handled by thread, too
1410 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1411 NULL, // NULL: default attributes: thread is joinable and has a
1412 // default, non-realtime scheduling policy
1413 receiver_thread_starter,
1414 new(nothrow) receiver_thread_start_arg_t(this,argp));
1415 if (pthread_status)
1416 {
1417 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1418
1419 delete argp;
1420 }
1421 else
1422 {
1423 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1424
1425 // remember pointer to thread arg structure
1426 // thread arg structure should be destroyed after thread termination only
1427 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1428 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1429 if (tmpinsiterator.second == false)
1430 {
1431 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1432 }
1433 unlock(); // uninstall_cleanup(1);
1434 }
1435}
1436
1437
1438/**
1439 * master listener thread starter:
1440 * just a static starter method to allow starting the
1441 * actual master_listener_thread() method.
1442 *
1443 * @param argp - pointer to the current TPoverTCP object instance
1444 */
1445void*
1446TPoverTCP::master_listener_thread_starter(void *argp)
1447{
1448 // invoke listener thread method
1449 if (argp != 0)
1450 {
1451 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1452 }
1453 return 0;
1454}
1455
1456
1457
1458/**
1459 * master listener thread: waits for incoming connections at the well-known tcp port
1460 * when a connection request is received this thread spawns a receiver_thread for
1461 * receiving packets from the peer at the new socket.
1462 */
1463void
1464TPoverTCP::master_listener_thread()
1465{
1466 // create a new address-structure for the listening masterthread
1467 struct sockaddr_in6 own_address_v6;
1468 own_address_v6.sin6_family = AF_INET6;
1469 own_address_v6.sin6_flowinfo= 0;
1470 own_address_v6.sin6_port = htons(tpparam.port); // use port number in param structure
1471 // accept incoming connections on all interfaces
1472 own_address_v6.sin6_addr = in6addr_any;
1473 own_address_v6.sin6_scope_id= 0;
1474
1475 // create a new address-structure for the listening masterthread
1476 struct sockaddr_in own_address_v4;
1477 own_address_v4.sin_family = AF_INET;
1478 own_address_v4.sin_port = htons(tpparam.port); // use port number in param structure
1479 // accept incoming connections on all interfaces
1480 own_address_v4.sin_addr.s_addr = INADDR_ANY;
1481
1482 // create a listening socket
1483 int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1484 if (master_listener_socket!=-1) v4_mode = false;
1485 if (master_listener_socket == -1) {
1486 master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
1487 if (master_listener_socket!=-1) v4_mode = true;
1488 }
1489 if (master_listener_socket == -1)
1490 {
1491 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1492 return;
1493 }
1494
1495 // Disable Nagle Algorithm, set (TCP_NODELAY)
1496 int nodelayflag= 1;
1497 int status= setsockopt(master_listener_socket,
1498 IPPROTO_TCP,
1499 TCP_NODELAY,
1500 &nodelayflag,
1501 sizeof(nodelayflag));
1502 if (status)
1503 {
1504 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1505 }
1506
1507 // Reuse ports, even if they are busy
1508 int socketreuseflag= 1;
1509 status= setsockopt(master_listener_socket,
1510 SOL_SOCKET,
1511 SO_REUSEADDR,
1512 (const char *) &socketreuseflag,
1513 sizeof(socketreuseflag));
1514 if (status)
1515 {
1516 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1517 }
1518
1519
1520 // bind the newly created socket to a specific address
1521 int bind_status = bind(master_listener_socket, v4_mode ?
1522 reinterpret_cast<struct sockaddr *>(&own_address_v4) :
1523 reinterpret_cast<struct sockaddr *>(&own_address_v6),
1524 v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
1525 if (bind_status)
1526 {
1527 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
1528 << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
1529 inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
1530 << " port " << tpparam.port << " failed, error: " << strerror(errno));
1531 return;
1532 }
1533
1534 // listen at the socket,
1535 // queuesize for pending connections= max_listen_queue_size
1536 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1537 if (listen_status)
1538 {
1539 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1540 << " failed, error: " << strerror(errno));
1541 return;
1542 }
1543 else
1544 {
1545 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1546 }
1547
1548 // activate O_NON_BLOCK for accept (accept does not block)
1549 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1550
1551 // create a pollfd struct for use in the mainloop
1552 struct pollfd poll_fd;
1553 poll_fd.fd = master_listener_socket;
1554 poll_fd.events = POLLIN | POLLPRI;
1555 poll_fd.revents = 0;
1556 /*
1557 #define POLLIN 0x001 // There is data to read.
1558 #define POLLPRI 0x002 // There is urgent data to read.
1559 #define POLLOUT 0x004 // Writing now will not block.
1560 */
1561
1562 bool terminate = false;
1563 // check for thread terminate condition using get_state()
1564 state_t currstate= get_state();
1565 int poll_status= 0;
1566 const unsigned int number_poll_sockets= 1;
1567 struct sockaddr_in6 peer_address;
1568 socklen_t peer_address_len;
1569 int conn_socket;
1570
1571 // check whether this thread is signaled for termination
1572 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1573 {
1574 // wait on number_poll_sockets (main drm socket)
1575 // for the events specified above for sleep_time (in ms)
1576 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1577 if (poll_fd.revents & POLLERR) // Error condition
1578 {
1579 if (errno != EINTR)
1580 {
1581 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1582 "Poll caused error " << strerror(errno) << " - indicated by revents");
1583 }
1584 else
1585 {
1586 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1587 }
1588
1589 }
1590 if (poll_fd.revents & POLLHUP) // Hung up
1591 {
1592 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1593 return;
1594 }
1595 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1596 {
1597 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1598 return;
1599 }
1600
1601 switch (poll_status)
1602 {
1603 case -1:
1604 if (errno != EINTR)
1605 {
1606 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1607 }
1608 else
1609 {
1610 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1611 }
1612
1613 break;
1614
1615 case 0:
1616#ifdef DEBUG_HARD
1617 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1618 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1619#endif
1620 currstate= get_state();
1621 continue;
1622 break;
1623
1624 default:
1625#ifdef DEBUG_HARD
1626 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1627#endif
1628 break;
1629 } // end switch
1630
1631 // after a successful accept call,
1632 // accept stores the address information of the connecting party
1633 // in peer_address and the size of its address in addrlen
1634 peer_address_len= sizeof(peer_address);
1635 conn_socket = accept (master_listener_socket,
1636 reinterpret_cast<struct sockaddr *>(&peer_address),
1637 &peer_address_len);
1638 if (conn_socket == -1)
1639 {
1640 if (errno != EWOULDBLOCK && errno != EAGAIN)
1641 {
1642 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1643 << " failed, error: " << strerror(errno));
1644 return;
1645 }
1646 }
1647 else
1648 {
1649 // create a new assocdata-object for the new thread
1650 AssocData* peer_assoc = NULL;
1651 appladdress addr(peer_address, IPPROTO_TCP);
1652
1653 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
1654 << " port #" << addr.get_port());
1655
1656 struct sockaddr_in6 own_address;
1657 if (v4_mode) {
1658 struct sockaddr_in own_address_v4;
1659 socklen_t own_address_len_v4 = sizeof(own_address_v4);
1660 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
1661 v4_to_v6(&own_address_v4, &own_address);
1662 } else {
1663 socklen_t own_address_len= sizeof(own_address);
1664 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1665 }
1666
1667 // AssocData will copy addr content into its own structure
1668 // allocated peer_assoc will be stored in connmap
1669 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1670
1671 bool insert_success= false;
1672 if (peer_assoc)
1673 {
1674 // start critical section
1675 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1676 insert_success= connmap.insert(peer_assoc);
1677 // end critical section
1678 unlock(); // uninstall_cleanup(1);
1679 }
1680
1681
1682 if (insert_success == false) // not inserted into connmap
1683 {
1684 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1685 << ", " << addr.get_ip_str() << ", port #"
1686 << addr.get_port() << " into connection map, aborting connection...");
1687
1688 // abort connection, delete its AssocData
1689 close (conn_socket);
1690 if (peer_assoc)
1691 {
1692 delete peer_assoc;
1693 peer_assoc= 0;
1694 }
1695 return;
1696
1697 } //end __else(connmap.insert());__
1698
1699 // create a new thread for each new connection
1700 create_new_receiver_thread(peer_assoc);
1701 } // end __else (connsocket)__
1702
1703 // get new thread state
1704 currstate= get_state();
1705
1706 } // end while(!terminate)
1707 return;
1708} // end listen_for_connections()
1709
1710
1711TPoverTCP::~TPoverTCP()
1712{
1713 init= false;
1714 this->connmap.clear();
1715
1716 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1717
1718 QueueManager::instance()->unregister_queue(tpparam.source);
1719}
1720
1721/** TPoverTCP Thread main loop.
1722 * This loop checks for internal messages of either
1723 * a send() call to start a new receiver thread, or,
1724 * of a receiver_thread() that signals its own termination
1725 * for proper cleanup of control structures.
1726 * It also handles the following internal TPoverTCPMsg types:
1727 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1728 * - TPoverTCPMsg::start - a particular receiver thread is started
1729 * @param nr number of current thread instance
1730 */
1731void
1732TPoverTCP::main_loop(uint32 nr)
1733{
1734
1735 // get internal queue for messages from receiver_thread
1736 FastQueue* fq = get_fqueue();
1737 if (!fq)
1738 {
1739 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1740 return;
1741 } // end if not fq
1742 // register queue for receiving internal messages from other modules
1743 QueueManager::instance()->register_queue(fq,tpparam.source);
1744
1745 // start master listener thread
1746 pthread_t master_listener_thread_ID;
1747 int pthread_status= pthread_create(&master_listener_thread_ID,
1748 NULL, // NULL: default attributes: thread is joinable and has a
1749 // default, non-realtime scheduling policy
1750 master_listener_thread_starter,
1751 this);
1752 if (pthread_status)
1753 {
1754 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1755 "New master listener thread could not be created: " << strerror(pthread_status));
1756 }
1757 else
1758 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1759
1760
1761 // define max latency for thread reaction on termination/stop signal
1762 timespec wait_interval= { 0, 250000000L }; // 250ms
1763 message* internal_thread_msg = NULL;
1764 state_t currstate= get_state();
1765
1766 // check whether this thread is signaled for termination
1767 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1768 {
1769 // poll internal message queue (blocking)
1770 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1771 {
1772 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1773 if (internalmsg)
1774 {
1775 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1776 {
1777 // a receiver thread terminated and signaled for cleanup by master thread
1778 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1779 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1780 lock();
1781 cleanup_receiver_thread( assocd );
1782 unlock();
1783 }
1784 else
1785 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1786 {
1787 // start a new receiver thread
1788 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1789 }
1790 else
1791 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1792
1793 delete internalmsg;
1794 }
1795 else
1796 {
1797 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1798 << internal_thread_msg->get_source());
1799 }
1800 } // endif
1801
1802 // get thread state
1803 currstate= get_state();
1804 } // end while
1805
1806 if (currstate==STATE_STOP)
1807 {
1808 // start abort actions
1809 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1810 } // end if stopped
1811
1812 // do not accept any more messages
1813 fq->shutdown();
1814 // terminate all receiver and sender threads that are still active
1815 terminate_all_threads();
1816}
1817
1818} // end namespace protlib
1819///@}
Note: See TracBrowser for help on using the repository browser.