source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.cpp@ 7038

Last change on this file since 7038 was 7038, checked in by mies, 14 years ago

updated protlib to work with v4 only

File size: 55.4 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63#define IPV6_ADDR_INT32_SMP 0x0000ffff
64
65namespace protlib {
66
67void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
68 bzero(sin, sizeof(*sin));
69 sin->sin_family = AF_INET;
70 sin->sin_port = sin6->sin6_port;
71 memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
72}
73
74/* Convert sockaddr_in to sockaddr_in6 in v4 mapped addr format. */
75void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
76 bzero(sin6, sizeof(*sin6));
77 sin6->sin6_family = AF_INET6;
78 sin6->sin6_port = sin->sin_port;
79 *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
80 *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
81 *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
82 *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
83}
84
85using namespace log;
86
87/** @defgroup tptcp TP over TCP
88 * @ingroup network
89 * @{
90 */
91
92char in6_addrstr[INET6_ADDRSTRLEN+1];
93
94
95/******* class TPoverTCP *******/
96
97
98/** get_connection_to() checks for already existing connections.
99 * If a connection exists, it returns "AssocData"
100 * and saves it in "connmap" for further use
101 * If no connection exists, a new connection to "addr"
102 * is created.
103 */
104AssocData*
105TPoverTCP::get_connection_to(const appladdress& addr)
106{
107 // get timeout
108 struct timespec ts;
109 get_time_of_day(ts);
110 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
111 if (ts.tv_nsec>=1000000000L)
112 {
113 ts.tv_sec += ts.tv_nsec / 1000000000L;
114 ts.tv_nsec= ts.tv_nsec % 1000000000L;
115 }
116
117 // create a new AssocData pointer, initialize it to NULL
118 AssocData* assoc= NULL;
119 int new_socket;
120 // loop until timeout is exceeded: TODO: check applicability of loop
121 do
122 {
123 // check for existing connections to addr
124 // start critical section
125 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
126 assoc= connmap.lookup(addr);
127 // end critical section
128 unlock(); // uninstall_cleanup(1);
129 if (assoc)
130 {
131 // If not shut down then use it, else abort, wait and check again.
132 if (!assoc->shutdown)
133 {
134 return assoc;
135 }
136 else
137 {
138 // TODO: connection is already in shutdown mode. What now?
139 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
140
141 return 0;
142 }
143 } //end __if (assoc)__
144 else
145 {
146 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
147 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
148 }
149
150 // no connection found, create a new one
151 new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
152 if (new_socket == -1)
153 {
154 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
155
156 return 0;
157 }
158
159 // Disable Nagle Algorithm, set (TCP_NODELAY)
160 int nodelayflag= 1;
161 int status= setsockopt(new_socket,
162 IPPROTO_TCP,
163 TCP_NODELAY,
164 &nodelayflag,
165 sizeof(nodelayflag));
166 if (status)
167 {
168 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
169 }
170
171 // Reuse ports, even if they are busy
172 int socketreuseflag= 1;
173 status= setsockopt(new_socket,
174 SOL_SOCKET,
175 SO_REUSEADDR,
176 (const char *) &socketreuseflag,
177 sizeof(socketreuseflag));
178 if (status)
179 {
180 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
181 }
182
183 struct sockaddr_in6 dest_address;
184 dest_address.sin6_flowinfo= 0;
185 dest_address.sin6_scope_id= 0;
186 addr.get_sockaddr(dest_address);
187
188 // connect the socket to the destination address
189 int connect_status = 0;
190 if (v4_mode) {
191 struct sockaddr_in dest_address_v4;
192 v6_to_v4( &dest_address_v4, &dest_address );
193 connect_status = connect(new_socket,
194 reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
195 sizeof(dest_address));
196 } else {
197 connect_status = connect(new_socket,
198 reinterpret_cast<const struct sockaddr*>(&dest_address),
199 sizeof(dest_address));
200 }
201
202 // connects to the listening_port of the peer's masterthread
203 if (connect_status != 0)
204 {
205 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
206 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
207
208 return 0; // error: couldn't connect
209 }
210
211
212 struct sockaddr_in6 own_address;
213 socklen_t own_address_len= sizeof(own_address);
214 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
215
216 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
217 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
218 << " port #" << ntohs(own_address.sin6_port));
219
220
221 // create new AssocData record (will copy addr)
222 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
223
224 // if assoc could be successfully created, insert it into ConnectionMap
225 if (assoc)
226 {
227 bool insert_success= false;
228 // start critical section
229 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
230 // insert AssocData into connection map
231 insert_success= connmap.insert(assoc);
232 // end critical section
233 unlock(); // uninstall_cleanup(1);
234
235 if (insert_success == true)
236 {
237#ifdef _DEBUG
238 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
239 << " via socket " << new_socket);
240
241
242#endif
243
244 // notify master thread to start a receiver thread (i.e. send selfmessage)
245 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
246 if (newmsg)
247 {
248 newmsg->send_to(tpparam.source);
249 return assoc;
250 }
251 else
252 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
253 }
254 else
255 {
256 // delete data and abort
257 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
258 <<", port #" << addr.get_port() << " into connection map, aborting connection");
259
260 // abort connection, delete its AssocData
261 close (new_socket);
262 if (assoc)
263 {
264 delete assoc;
265 assoc= 0;
266 }
267 return assoc;
268 } // end else connmap.insert
269
270 } // end "if (assoc)"
271 }
272 while (wait_cond(ts)!=ETIMEDOUT);
273
274 return assoc;
275} //end get_connection_to
276
277
278/** terminates a signaling association/connection
279 * - if no connection exists, generate a warning
280 * - otherwise: generate internal msg to related receiver thread
281 */
282void
283TPoverTCP::terminate(const address& in_addr)
284{
285#ifndef _NO_LOGGING
286 const char *const thisproc="terminate() - ";
287#endif
288
289 appladdress* addr = NULL;
290 addr = dynamic_cast<appladdress*>(in_addr.copy());
291
292 if (!addr) return;
293
294 // Create a new AssocData-pointer
295 AssocData* assoc = NULL;
296
297 // check for existing connections to addr
298
299 // start critical section:
300 // please note if receiver_thread terminated already, the assoc data is not
301 // available anymore therefore we need a lock around cleanup_receiver_thread()
302
303 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
304 assoc= connmap.lookup(*addr);
305 if (assoc)
306 {
307 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
308 // If not shut down then use it, else abort, wait and check again.
309 if (!assoc->shutdown)
310 {
311 if (assoc->socketfd)
312 {
313 // disallow sending
314 if (shutdown(assoc->socketfd,SHUT_WR))
315 {
316 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
317 }
318 else
319 {
320 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
321 }
322 }
323 assoc->shutdown= true;
324 }
325 else
326 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
327
328 }
329 else
330 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
331
332 stop_receiver_thread(assoc);
333
334 // end critical section
335 unlock(); // uninstall_cleanup(1);
336
337 if (addr) delete addr;
338}
339
340
341/** generates and internal TPoverTCP message to send a NetMsg to the network
342 * - it is necessary to let a thread do this, because the caller
343 * may get blocked if the connect() or send() call hangs for a while
344 * - the sending thread will call TPoverTCP::tcpsend()
345 * - if no connection exists, creates a new one (unless use_existing_connection is true)
346 * @note the netmsg is deleted by the send() method when it is not used anymore
347 */
348void
349TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
350{
351 if (netmsg == NULL) {
352 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
353 return;
354 }
355
356 appladdress* addr = NULL;
357 addr= dynamic_cast<appladdress*>(in_addr.copy());
358
359 if (!addr)
360 {
361 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
362 return;
363 }
364
365 // lock due to sendermap access
366 lock();
367
368 // check for existing sender thread
369 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
370
371 FastQueue* destqueue= 0;
372
373 if (it == senderthread_queuemap.end())
374 { // no sender thread found so far
375
376 // if we already have an existing connection it is save to create a sender thread
377 // since get_connection_to() will not be invoked, so an existing connection will
378 // be used
379 const AssocData* assoc = connmap.lookup(*addr);
380
381 //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
382
383 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
384 { // it is allowed to create a new connection for this thread
385 // create a new queue for sender thread
386 FastQueue* sender_thread_queue= new FastQueue;
387 create_new_sender_thread(sender_thread_queue);
388 // remember queue for later use
389
390 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
391 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
392
393 destqueue= sender_thread_queue;
394 }
395 }
396 else
397 { // we have a sender thread, use stored queue from map
398 destqueue= it->second;
399 }
400
401 unlock();
402
403 // send a send_data message to it (if we have a destination queue)
404 if (destqueue)
405 {
406 // both parameters will be freed after message was sent!
407
408 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
409 if (internalmsg)
410 {
411 // send the internal message to the sender thread queue
412 internalmsg->send(tpparam.source,destqueue);
413 }
414 }
415 else
416 {
417 if (!use_existing_connection)
418 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
419 else
420 {
421 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
422
423 }
424 // cannot send data, so we must drop it
425 delete netmsg;
426 }
427
428 if (addr) delete addr;
429}
430
431/** sends a NetMsg to the network.
432 *
433 * @param netmsg message to send
434 * @param addr transport endpoint address
435 *
436 * @note if no connection exists, creates a new one
437 * @note both parameters are deleted after the message was sent
438 */
439void
440TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
441{
442#ifndef _NO_LOGGING
443 const char *const thisproc="sender - ";
444#endif
445
446 // set result initially to success, set it to failure
447 // in places where these failures occur
448 int result = TCP_SUCCESS;
449 int saved_errno= 0;
450 int ret= 0;
451
452 // Create a new AssocData-pointer
453 AssocData* assoc = NULL;
454
455 // tp.cpp checks for initialisation of tp and correctness of
456 // msgsize, protocol and ip,
457 // throws an error if something is not right
458 if (addr) {
459 addr->convert_to_ipv6();
460 check_send_args(*netmsg,*addr);
461 }
462 else
463 {
464 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
465 result= TCP_SEND_FAILURE;
466
467 delete netmsg;
468 delete addr;
469
470 throw TPErrorInternal();
471 }
472
473
474 // check for existing connections,
475 // if a connection exists, return its AssocData
476 // and save it in assoc for further use
477 // if no connection exists, create a new one (in get_connection_to()).
478 // Connection is inserted into connection map that must be done
479 // with exclusive access
480 assoc= get_connection_to(*addr);
481
482 if (assoc==NULL || assoc->socketfd<=0)
483 {
484 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
485
486 delete netmsg;
487 delete addr;
488 return;
489 }
490
491 if (assoc->shutdown)
492 {
493 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
494 delete netmsg;
495 delete addr;
496
497 throw TPErrorSendFailed();
498 }
499
500 uint32 msgsize= netmsg->get_size();
501#ifdef DEBUG_HARD
502 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
503#endif
504
505 const uint32 retry_send_max = 3;
506 uint32 retry_count = 0;
507 // send all the data contained in netmsg to the socket
508 // which belongs to the address "addr"
509 for (uint32 bytes_sent= 0;
510 bytes_sent < msgsize;
511 bytes_sent+= ret)
512 {
513
514#ifdef _DEBUG_HARD
515 for (uint32 i=0;i<msgsize;i++)
516 {
517 cout << "send_buf: " << i << " : ";
518 if ( isalnum(*(netmsg->get_buffer()+i)) )
519 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
520 else
521 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
522 cout << endl;
523 }
524
525 cout << endl;
526 cout << "bytes_sent: " << bytes_sent << endl;
527 cout << "Message size: " << msgsize << endl;
528 cout << "Send-Socket: " << assoc->socketfd << endl;
529 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
530 cout << "vor send " << endl;
531#endif
532
533 retry_count= 0;
534 do
535 {
536 // socket send
537 ret= ::send(assoc->socketfd,
538 netmsg->get_buffer() + bytes_sent,
539 msgsize - bytes_sent,
540 MSG_NOSIGNAL);
541
542 // send_buf + bytes_sent
543
544 // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
545 // retry sending
546 if (ret < 0)
547 { // internal error during send occured
548 saved_errno= errno;
549 switch(saved_errno)
550 {
551 case EAGAIN: // same as EWOULDBLOCK
552 case EINTR: // man page says: A signal occurred before any data was transmitted.
553 case ENOBUFS: // The output queue for a network interface was full.
554 retry_count++;
555 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
556 << " - retry sending, retry #" << retry_count);
557 // pace down a little bit, sleep for a while
558 sleep(1);
559 break;
560
561 // everything else should not lead to repetition
562 default:
563 retry_count= retry_send_max;
564 break;
565 } // end switch
566 } // endif
567 else // leave while
568 break;
569 }
570 while(retry_count < retry_send_max);
571
572 if (ret < 0)
573 { // unrecoverable error occured
574
575 result= TCP_SEND_FAILURE;
576 break;
577 } // end if (ret < 0)
578 else
579 {
580 if (debug_pdu)
581 {
582 ostringstream hexdump;
583 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
584 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
585 }
586 }
587
588 // continue with send next data block in next for() iteration
589 } // end for
590
591 // *** note: netmsg is deleted here ***
592 delete netmsg;
593
594 // Throwing an exception within a critical section does not
595 // unlock the mutex.
596
597 if (result != TCP_SUCCESS)
598 {
599 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
600 delete addr;
601
602 throw TPErrorSendFailed(saved_errno);
603
604 }
605 else
606 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
607
608 if (!assoc) {
609 // no connection
610
611 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
612 << ", port #" << addr->get_port());
613
614 delete addr;
615
616 throw TPErrorUnreachable(); // should be no assoc found
617 } // end "if (!assoc)"
618
619 // *** delete address ***
620 delete addr;
621}
622
623/* this thread waits for an internal message that either:
624 * - requests transmission of a packet
625 * - requests to stop this thread
626 * @param argp points to the thread queue for internal messages
627 */
628void
629TPoverTCP::sender_thread(void *argp)
630{
631#ifndef _NO_LOGGING
632 const char *const methodname="senderthread - ";
633#endif
634
635 message* internal_thread_msg = NULL;
636
637 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
638
639 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
640 if (!fq)
641 {
642 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
643 return;
644 }
645
646 bool terminate= false;
647 TPoverTCPMsg* internalmsg= 0;
648 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
649 {
650 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
651
652 if (internalmsg == 0)
653 {
654 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
655 }
656 else
657 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
658 {
659 // create a connection if none exists and send the netmsg
660 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
661 {
662 try
663 {
664 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
665 } // end try
666 catch(TPErrorSendFailed& err)
667 {
668 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
669 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
670 } // end catch
671 catch(TPError& err)
672 {
673 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
674 } // end catch
675 catch(...)
676 {
677 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
678 }
679 }
680 else
681 {
682 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
683 }
684 }
685 else
686 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
687 {
688 terminate= true;
689 }
690 } // end while
691
692 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
693}
694
695
696/** receiver thread listens at a TCP socket for incoming PDUs
697 * and passes complete PDUs to the coordinator. Incomplete
698 * PDUs due to aborted connections or buffer overflows are discarded.
699 * @param argp - assoc data and flags sig_terminate and terminated
700 *
701 * @note this is a static member function, so you cannot use class variables
702 */
703void
704TPoverTCP::receiver_thread(void *argp)
705{
706#ifndef _NO_LOGGING
707 const char *const methodname="receiver - ";
708#endif
709
710 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
711 const appladdress* peer_addr = NULL;
712 const appladdress* own_addr = NULL;
713 uint32 bytes_received = 0;
714 TPMsg* tpmsg= NULL;
715
716 // argument parsing - start
717 if (receiver_thread_argp == 0)
718 {
719 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
720
721 return;
722 }
723 else
724 {
725 // change status to running, i.e., not terminated
726 receiver_thread_argp->terminated= false;
727
728#ifdef _DEBUG
729 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
730#endif
731 }
732
733 int conn_socket= 0;
734 if (receiver_thread_argp->peer_assoc)
735 {
736 // get socket descriptor from arg
737 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
738 // get pointer to peer address of socket (source/sender address of peer) from arg
739 peer_addr= &receiver_thread_argp->peer_assoc->peer;
740 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
741 }
742 else
743 {
744 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
745
746 return;
747 }
748
749 if (peer_addr == 0)
750 {
751 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
752
753 return;
754 }
755 // argument parsing - end
756#ifdef _DEBUG
757 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
758 "Preparing to wait for data at socket "
759 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
760#endif
761
762 int ret= 0;
763 uint32 msgcontentlength= 0;
764 bool msgcontentlength_known= false;
765 bool pdu_complete= false; // when to terminate inner loop
766
767 /* maybe use this to create a new pdu,
768 /// constructor
769 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
770 */
771
772 // activate O_NON_BLOCK for recv on socket conn_socket
773 // CAVEAT: this also implies non-blocking send()!
774 //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
775
776 // set options for polling
777 const unsigned int number_poll_sockets= 1;
778 struct pollfd poll_fd;
779 // have to set structure before poll call
780 poll_fd.fd = conn_socket;
781 poll_fd.events = POLLIN | POLLPRI;
782 poll_fd.revents = 0;
783
784 int poll_status;
785 bool recv_error= false;
786
787 NetMsg* netmsg= 0;
788 NetMsg* remainbuf= 0;
789 size_t buffer_bytes_left= 0;
790 size_t trailingbytes= 0;
791 bool skiprecv= false;
792 // loop until we receive a terminate signal (read-only var for this thread)
793 // or get an error from socket read
794 while( receiver_thread_argp->sig_terminate == false )
795 {
796 // Read next PDU from socket or process trailing bytes in remainbuf
797 ret= 0;
798 msgcontentlength= 0;
799 msgcontentlength_known= false;
800 pdu_complete= false;
801 netmsg= 0;
802
803 // there are trailing bytes left from the last receive call
804 if (remainbuf != 0)
805 {
806 netmsg= remainbuf;
807 remainbuf= 0;
808 buffer_bytes_left= netmsg->get_size()-trailingbytes;
809 bytes_received= trailingbytes;
810 trailingbytes= 0;
811 skiprecv= true;
812 }
813 else // no trailing bytes, create a new buffer
814 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
815 {
816 buffer_bytes_left= netmsg->get_size();
817 bytes_received= 0;
818 skiprecv= false;
819 }
820 else
821 { // buffer allocation failed
822 bytes_received= 0;
823 buffer_bytes_left= 0;
824 recv_error= true;
825 }
826
827 // loops until PDU is complete
828 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
829 while (!pdu_complete &&
830 !recv_error &&
831 !receiver_thread_argp->sig_terminate)
832 {
833 if (!skiprecv)
834 {
835 // read from TCP socket or return after sleep_time
836 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
837
838 if (receiver_thread_argp->sig_terminate)
839 {
840 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
841 // disallow sending
842 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
843 if (myassoc->shutdown == false)
844 {
845 myassoc->shutdown= true;
846 if (shutdown(myassoc->socketfd,SHUT_WR))
847 {
848 if ( errno != ENOTCONN )
849 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
850 }
851 }
852 // try to read do a last read from the TCP socket or return after sleep_time
853 if (poll_status == 0)
854 {
855 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
856 }
857 }
858
859 if (poll_fd.revents & POLLERR) // Error condition
860 {
861 if (errno == 0 || errno == EINTR)
862 {
863 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
864 }
865 else
866 {
867 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
868 recv_error= true;
869 }
870 }
871
872 if (poll_fd.revents & POLLHUP) // Hung up
873 {
874 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
875 recv_error= true;
876 }
877
878 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
879 {
880 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
881 recv_error= true;
882 }
883
884 // check status (return value) of poll call
885 switch (poll_status)
886 {
887 case -1:
888 if (errno == 0 || errno == EINTR)
889 {
890 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
891 }
892 else
893 {
894 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
895 recv_error= true;
896 }
897
898 continue; // next while iteration
899 break;
900
901 case 0:
902#ifdef DEBUG_HARD
903 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
904#endif
905 continue; // next while iteration
906 break;
907
908 default:
909#ifdef DEBUG_HARD
910 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
911#endif
912 break;
913 } // end switch
914
915
916 /// receive data from socket buffer (recv will not block)
917 ret = recv(conn_socket,
918 netmsg->get_buffer() + bytes_received,
919 buffer_bytes_left,
920 MSG_DONTWAIT);
921
922 if ( ret < 0 )
923 {
924 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
925 {
926 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
927 recv_error= true;
928 continue;
929 }
930 else
931 { // errno==EAGAIN || errno==EWOULDBLOCK
932 // just nothing to read from socket, continue w/ next poll
933 continue;
934 }
935 }
936 else
937 {
938 if (ret == 0)
939 {
940 // this means that EOF is reached,
941 // other side has closed connection
942 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
943 // disallow sending
944 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
945 if (myassoc->shutdown == false)
946 {
947 myassoc->shutdown= true;
948 if (shutdown(myassoc->socketfd,SHUT_WR))
949 {
950 if ( errno != ENOTCONN )
951 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
952 }
953 }
954 // not a real error, but we must quit the receive loop
955 recv_error= true;
956 }
957 else
958 {
959
960 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
961 // track number of received bytes
962 bytes_received+= ret;
963 buffer_bytes_left-= ret;
964 }
965 }
966 } // end if do not skip recv() statement
967
968 if (buffer_bytes_left < 0) ///< buffer space exhausted now
969 {
970 recv_error= true;
971 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
972 }
973
974 if (!msgcontentlength_known) ///< common header not parsed
975 {
976 // enough bytes read to parse common header?
977 if (bytes_received >= common_header_length)
978 {
979 // get message content length in number of bytes
980 if (getmsglength(*netmsg, msgcontentlength))
981 msgcontentlength_known= true;
982 else
983 {
984 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
985
986 ostringstream hexdumpstr;
987 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
988 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
989
990 // reset all counters
991 msgcontentlength= 0;
992 msgcontentlength_known= false;
993 bytes_received= 0;
994 pdu_complete= false;
995 continue;
996 }
997 }
998 } // endif common header not parsed
999
1000 // check whether we have read the whole Protocol PDU
1001 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
1002 if (msgcontentlength_known)
1003 {
1004 if (bytes_received-common_header_length >= msgcontentlength )
1005 {
1006 pdu_complete= true;
1007 // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1008 netmsg->truncate(common_header_length+msgcontentlength);
1009
1010 // trailing bytes are copied into new buffer
1011 if (bytes_received-common_header_length > msgcontentlength)
1012 {
1013 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1014 remainbuf= new NetMsg(NetMsg::max_size);
1015 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1016 bytes_received= common_header_length+msgcontentlength;
1017 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1018 }
1019 }
1020 else
1021 { // not enough bytes in the buffer must continue to read from network
1022 skiprecv= false;
1023 }
1024 } // endif msgcontentlength_known
1025 } // end while (!pdu_complete && !recv_error && !signalled for termination)
1026 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1027
1028 // if other side closed the connection, we should still be able to deliver the remaining data
1029 if (ret == 0)
1030 {
1031 recv_error= false;
1032 }
1033
1034 // deliver only complete PDUs to signaling module
1035 if (!recv_error && pdu_complete)
1036 {
1037 // create TPMsg and send it to the signaling thread
1038 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1039 if (tpmsg)
1040 {
1041 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1042 << " to module " << message::get_qaddr_name(tpparam.dest));
1043 }
1044
1045 debug_pdu=false;
1046
1047 if (debug_pdu)
1048 {
1049 ostringstream hexdump;
1050 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1051 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1052 }
1053
1054 // send the message if it was successfully created
1055 // bool message::send_to(qaddr_t dest, bool exp = false);
1056 if (!tpmsg
1057 || (!tpmsg->get_peeraddress())
1058 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
1059 {
1060 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1061 if (tpmsg) delete tpmsg;
1062 } // end if tpmsg not allocated or not addr or not sent
1063
1064
1065 } // end if !recv_error
1066 else
1067 { // error during receive or PDU incomplete
1068 if (bytes_received>0)
1069 {
1070 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1071 }
1072
1073 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1074 {
1075 ostringstream hexdumpstr;
1076 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1077 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
1078 }
1079 // leave the outer loop
1080 /**********************/
1081 break;
1082 /**********************/
1083 } // end else
1084
1085 } // end while (thread not signalled for termination)
1086
1087 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
1088 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1089
1090 // shutdown socket
1091 if (shutdown(conn_socket, SHUT_RD))
1092 {
1093 if ( errno != ENOTCONN )
1094 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1095 }
1096
1097 // close socket
1098 close(conn_socket);
1099
1100 receiver_thread_argp->terminated= true;
1101
1102 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1103
1104#ifdef _DEBUG
1105 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1106#endif
1107 // notify master thread for invocation of cleanup procedure
1108 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1109 // send message to main loop thread
1110 newmsg->send_to(tpparam.source);
1111
1112}
1113
1114
1115/** this signals a terminate to a thread and wait for the thread to stop
1116 * @note it is not safe to access any thread related data after this method returned,
1117 * because the receiver thread will initiate a cleanup_receiver_thread() method
1118 * which may erase all relevant thread data.
1119 */
1120void
1121TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1122{
1123 // All operations on recv_thread_argmap and connmap require an already acquired lock
1124 // after this procedure peer_assoc may be invalid because it was erased
1125
1126 // start critical section
1127
1128 if (peer_assoc == 0)
1129 return;
1130
1131 pthread_t thread_id= peer_assoc->thread_ID;
1132
1133 // try to clean up receiver_thread_arg
1134 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1135 receiver_thread_arg_t* recv_thread_argp=
1136 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1137 if (recv_thread_argp)
1138 {
1139 if (!recv_thread_argp->terminated)
1140 {
1141 // thread signaled termination, but is not?
1142 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1143
1144 // signal thread for its termination
1145 recv_thread_argp->sig_terminate= true;
1146 // wait for thread to join after termination
1147 pthread_join(thread_id, 0);
1148 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1149 return;
1150 }
1151 }
1152 else
1153 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1154
1155}
1156
1157
1158/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1159 * usually called by the master_thread after the receiver_thread terminated
1160 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1161 * is still valid
1162 */
1163void
1164TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1165{
1166 // All operations on recv_thread_argmap and connmap require an already acquired lock
1167 // after this procedure peer_assoc may be invalid because it was erased
1168
1169 // start critical section
1170
1171 if (peer_assoc == 0)
1172 return;
1173
1174 pthread_t thread_id= peer_assoc->thread_ID;
1175
1176 // try to clean up receiver_thread_arg
1177 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1178 receiver_thread_arg_t* recv_thread_argp=
1179 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1180 if (recv_thread_argp)
1181 {
1182 if (!recv_thread_argp->terminated)
1183 {
1184 // thread signaled termination, but is not?
1185 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1186 return;
1187 }
1188 else
1189 { // if thread is already terminated
1190 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1191
1192 // delete it from receiver map
1193 recv_thread_argmap.erase(recv_thread_arg_iter);
1194
1195 // then delete receiver arg structure
1196 delete recv_thread_argp;
1197 }
1198 }
1199
1200 // delete entry from connection map
1201
1202 // cleanup sender thread
1203 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1204 terminate_sender_thread(peer_assoc);
1205
1206 // delete the AssocData structure from the connection map
1207 // also frees allocated AssocData structure
1208 connmap.erase(peer_assoc);
1209
1210 // end critical section
1211
1212 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1213}
1214
1215
1216/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1217 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1218 * is still valid, a lock is also required, because senderthread_queuemap is changed
1219 */
1220void
1221TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1222{
1223 if (assoc == 0)
1224 {
1225 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1226 return;
1227 }
1228
1229 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1230
1231 if (it != senderthread_queuemap.end())
1232 { // we have a sender thread: send a stop message to it
1233 FastQueue* destqueue= it->second;
1234 if (destqueue)
1235 {
1236 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1237 if (internalmsg)
1238 {
1239 // send the internal message to the sender thread queue
1240 internalmsg->send(tpparam.source,destqueue);
1241 }
1242 }
1243 else
1244 {
1245 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1246 }
1247 // erase entry from map
1248 senderthread_queuemap.erase(it);
1249 }
1250}
1251
1252/* terminate all active threads
1253 * note: locking should not be necessary here because this message is called as last method from
1254 * main_loop()
1255 */
1256void
1257TPoverTCP::terminate_all_threads()
1258{
1259 AssocData* assoc= 0;
1260 receiver_thread_arg_t* terminate_argp;
1261
1262 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1263 terminate_iterator != recv_thread_argmap.end();
1264 terminate_iterator++)
1265 {
1266 if ( (terminate_argp= terminate_iterator->second) != 0)
1267 {
1268 // we need a non const pointer to erase it later on
1269 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1270 // check whether thread is still alive
1271 if (terminate_argp->terminated == false)
1272 {
1273 terminate_argp->sig_terminate= true;
1274 // then wait for its termination
1275 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1276 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1277
1278 pthread_join(terminate_iterator->first, 0);
1279
1280 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1281 }
1282 else
1283 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1284 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1285
1286 // cleanup all remaining argument structures of terminated threads
1287 delete terminate_argp;
1288
1289 // terminate any related sender thread that is still running
1290 terminate_sender_thread(assoc);
1291
1292 connmap.erase(assoc);
1293 // delete assoc is not necessary, because connmap.erase() will do the job
1294 }
1295 } // end for
1296}
1297
1298
1299/**
1300 * sender thread starter:
1301 * just a static starter method to allow starting the
1302 * actual sender_thread() method.
1303 *
1304 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1305 */
1306void*
1307TPoverTCP::sender_thread_starter(void *argp)
1308{
1309 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1310
1311 //cout << "invoked sender_thread_Starter" << endl;
1312
1313 // invoke sender thread method
1314 if (sargp != 0 && sargp->instance != 0)
1315 {
1316 // call receiver_thread member function on object instance
1317 sargp->instance->sender_thread(sargp->sender_thread_queue);
1318
1319 //cout << "Before deletion of sarg" << endl;
1320
1321 // no longer needed
1322 delete sargp;
1323 }
1324 else
1325 {
1326 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1327 }
1328 return 0;
1329}
1330
1331
1332
1333
1334/**
1335 * receiver thread starter:
1336 * just a static starter method to allow starting the
1337 * actual receiver_thread() method.
1338 *
1339 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1340 */
1341void*
1342TPoverTCP::receiver_thread_starter(void *argp)
1343{
1344 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1345 // invoke receiver thread method
1346 if (rargp != 0 && rargp->instance != 0)
1347 {
1348 // call receiver_thread member function on object instance
1349 rargp->instance->receiver_thread(rargp->rtargp);
1350
1351 // no longer needed
1352 delete rargp;
1353 }
1354 else
1355 {
1356 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1357 }
1358 return 0;
1359}
1360
1361
1362void
1363TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1364{
1365 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1366
1367 pthread_t senderthreadid;
1368 // create new thread; (arg == 0) is handled by thread, too
1369 int pthread_status= pthread_create(&senderthreadid,
1370 NULL, // NULL: default attributes: thread is joinable and has a
1371 // default, non-realtime scheduling policy
1372 TPoverTCP::sender_thread_starter,
1373 new sender_thread_start_arg_t(this,senderfqueue));
1374 if (pthread_status)
1375 {
1376 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1377
1378 delete senderfqueue;
1379 }
1380}
1381
1382
1383void
1384TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1385{
1386 receiver_thread_arg_t* argp=
1387 new(nothrow) receiver_thread_arg(peer_assoc);
1388
1389 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1390
1391 // create new thread; (arg == 0) is handled by thread, too
1392 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1393 NULL, // NULL: default attributes: thread is joinable and has a
1394 // default, non-realtime scheduling policy
1395 receiver_thread_starter,
1396 new(nothrow) receiver_thread_start_arg_t(this,argp));
1397 if (pthread_status)
1398 {
1399 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1400
1401 delete argp;
1402 }
1403 else
1404 {
1405 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1406
1407 // remember pointer to thread arg structure
1408 // thread arg structure should be destroyed after thread termination only
1409 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1410 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1411 if (tmpinsiterator.second == false)
1412 {
1413 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1414 }
1415 unlock(); // uninstall_cleanup(1);
1416 }
1417}
1418
1419
1420/**
1421 * master listener thread starter:
1422 * just a static starter method to allow starting the
1423 * actual master_listener_thread() method.
1424 *
1425 * @param argp - pointer to the current TPoverTCP object instance
1426 */
1427void*
1428TPoverTCP::master_listener_thread_starter(void *argp)
1429{
1430 // invoke listener thread method
1431 if (argp != 0)
1432 {
1433 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1434 }
1435 return 0;
1436}
1437
1438
1439
1440/**
1441 * master listener thread: waits for incoming connections at the well-known tcp port
1442 * when a connection request is received this thread spawns a receiver_thread for
1443 * receiving packets from the peer at the new socket.
1444 */
1445void
1446TPoverTCP::master_listener_thread()
1447{
1448 // create a new address-structure for the listening masterthread
1449 struct sockaddr_in6 own_address_v6;
1450 own_address_v6.sin6_family = AF_INET6;
1451 own_address_v6.sin6_flowinfo= 0;
1452 own_address_v6.sin6_port = htons(tpparam.port); // use port number in param structure
1453 // accept incoming connections on all interfaces
1454 own_address_v6.sin6_addr = in6addr_any;
1455 own_address_v6.sin6_scope_id= 0;
1456
1457 // create a new address-structure for the listening masterthread
1458 struct sockaddr_in own_address_v4;
1459 own_address_v4.sin_family = AF_INET;
1460 own_address_v4.sin_port = htons(tpparam.port); // use port number in param structure
1461 // accept incoming connections on all interfaces
1462 own_address_v4.sin_addr.s_addr = INADDR_ANY;
1463
1464 // create a listening socket
1465 int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1466 if (master_listener_socket!=-1) v4_mode = false;
1467 if (master_listener_socket == -1) {
1468 master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
1469 if (master_listener_socket!=-1) v4_mode = true;
1470 }
1471 if (master_listener_socket == -1)
1472 {
1473 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1474 return;
1475 }
1476
1477 // Disable Nagle Algorithm, set (TCP_NODELAY)
1478 int nodelayflag= 1;
1479 int status= setsockopt(master_listener_socket,
1480 IPPROTO_TCP,
1481 TCP_NODELAY,
1482 &nodelayflag,
1483 sizeof(nodelayflag));
1484 if (status)
1485 {
1486 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1487 }
1488
1489 // Reuse ports, even if they are busy
1490 int socketreuseflag= 1;
1491 status= setsockopt(master_listener_socket,
1492 SOL_SOCKET,
1493 SO_REUSEADDR,
1494 (const char *) &socketreuseflag,
1495 sizeof(socketreuseflag));
1496 if (status)
1497 {
1498 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1499 }
1500
1501
1502 // bind the newly created socket to a specific address
1503 int bind_status = bind(master_listener_socket, v4_mode ?
1504 reinterpret_cast<struct sockaddr *>(&own_address_v4) :
1505 reinterpret_cast<struct sockaddr *>(&own_address_v6),
1506 v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
1507 if (bind_status)
1508 {
1509 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
1510 << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
1511 inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
1512 << " port " << tpparam.port << " failed, error: " << strerror(errno));
1513 return;
1514 }
1515
1516 // listen at the socket,
1517 // queuesize for pending connections= max_listen_queue_size
1518 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1519 if (listen_status)
1520 {
1521 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1522 << " failed, error: " << strerror(errno));
1523 return;
1524 }
1525 else
1526 {
1527 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1528 }
1529
1530 // activate O_NON_BLOCK for accept (accept does not block)
1531 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1532
1533 // create a pollfd struct for use in the mainloop
1534 struct pollfd poll_fd;
1535 poll_fd.fd = master_listener_socket;
1536 poll_fd.events = POLLIN | POLLPRI;
1537 poll_fd.revents = 0;
1538 /*
1539 #define POLLIN 0x001 // There is data to read.
1540 #define POLLPRI 0x002 // There is urgent data to read.
1541 #define POLLOUT 0x004 // Writing now will not block.
1542 */
1543
1544 bool terminate = false;
1545 // check for thread terminate condition using get_state()
1546 state_t currstate= get_state();
1547 int poll_status= 0;
1548 const unsigned int number_poll_sockets= 1;
1549 struct sockaddr_in6 peer_address;
1550 socklen_t peer_address_len;
1551 int conn_socket;
1552
1553 // check whether this thread is signaled for termination
1554 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1555 {
1556 // wait on number_poll_sockets (main drm socket)
1557 // for the events specified above for sleep_time (in ms)
1558 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1559 if (poll_fd.revents & POLLERR) // Error condition
1560 {
1561 if (errno != EINTR)
1562 {
1563 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1564 "Poll caused error " << strerror(errno) << " - indicated by revents");
1565 }
1566 else
1567 {
1568 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1569 }
1570
1571 }
1572 if (poll_fd.revents & POLLHUP) // Hung up
1573 {
1574 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1575 return;
1576 }
1577 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1578 {
1579 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1580 return;
1581 }
1582
1583 switch (poll_status)
1584 {
1585 case -1:
1586 if (errno != EINTR)
1587 {
1588 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1589 }
1590 else
1591 {
1592 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1593 }
1594
1595 break;
1596
1597 case 0:
1598#ifdef DEBUG_HARD
1599 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1600 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1601#endif
1602 currstate= get_state();
1603 continue;
1604 break;
1605
1606 default:
1607#ifdef DEBUG_HARD
1608 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1609#endif
1610 break;
1611 } // end switch
1612
1613 // after a successful accept call,
1614 // accept stores the address information of the connecting party
1615 // in peer_address and the size of its address in addrlen
1616 peer_address_len= sizeof(peer_address);
1617 conn_socket = accept (master_listener_socket,
1618 reinterpret_cast<struct sockaddr *>(&peer_address),
1619 &peer_address_len);
1620 if (conn_socket == -1)
1621 {
1622 if (errno != EWOULDBLOCK && errno != EAGAIN)
1623 {
1624 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1625 << " failed, error: " << strerror(errno));
1626 return;
1627 }
1628 }
1629 else
1630 {
1631 // create a new assocdata-object for the new thread
1632 AssocData* peer_assoc = NULL;
1633 appladdress addr(peer_address, IPPROTO_TCP);
1634
1635 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
1636 << " port #" << addr.get_port());
1637
1638 struct sockaddr_in6 own_address;
1639 if (v4_mode) {
1640 struct sockaddr_in own_address_v4;
1641 socklen_t own_address_len_v4 = sizeof(own_address_v4);
1642 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
1643 v4_to_v6(&own_address_v4, &own_address);
1644 } else {
1645 socklen_t own_address_len= sizeof(own_address);
1646 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1647 }
1648
1649 // AssocData will copy addr content into its own structure
1650 // allocated peer_assoc will be stored in connmap
1651 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1652
1653 bool insert_success= false;
1654 if (peer_assoc)
1655 {
1656 // start critical section
1657 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1658 insert_success= connmap.insert(peer_assoc);
1659 // end critical section
1660 unlock(); // uninstall_cleanup(1);
1661 }
1662
1663
1664 if (insert_success == false) // not inserted into connmap
1665 {
1666 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1667 << ", " << addr.get_ip_str() << ", port #"
1668 << addr.get_port() << " into connection map, aborting connection...");
1669
1670 // abort connection, delete its AssocData
1671 close (conn_socket);
1672 if (peer_assoc)
1673 {
1674 delete peer_assoc;
1675 peer_assoc= 0;
1676 }
1677 return;
1678
1679 } //end __else(connmap.insert());__
1680
1681 // create a new thread for each new connection
1682 create_new_receiver_thread(peer_assoc);
1683 } // end __else (connsocket)__
1684
1685 // get new thread state
1686 currstate= get_state();
1687
1688 } // end while(!terminate)
1689 return;
1690} // end listen_for_connections()
1691
1692
1693TPoverTCP::~TPoverTCP()
1694{
1695 init= false;
1696
1697 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1698
1699 QueueManager::instance()->unregister_queue(tpparam.source);
1700}
1701
1702/** TPoverTCP Thread main loop.
1703 * This loop checks for internal messages of either
1704 * a send() call to start a new receiver thread, or,
1705 * of a receiver_thread() that signals its own termination
1706 * for proper cleanup of control structures.
1707 * It also handles the following internal TPoverTCPMsg types:
1708 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1709 * - TPoverTCPMsg::start - a particular receiver thread is started
1710 * @param nr number of current thread instance
1711 */
1712void
1713TPoverTCP::main_loop(uint32 nr)
1714{
1715
1716 // get internal queue for messages from receiver_thread
1717 FastQueue* fq = get_fqueue();
1718 if (!fq)
1719 {
1720 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1721 return;
1722 } // end if not fq
1723 // register queue for receiving internal messages from other modules
1724 QueueManager::instance()->register_queue(fq,tpparam.source);
1725
1726 // start master listener thread
1727 pthread_t master_listener_thread_ID;
1728 int pthread_status= pthread_create(&master_listener_thread_ID,
1729 NULL, // NULL: default attributes: thread is joinable and has a
1730 // default, non-realtime scheduling policy
1731 master_listener_thread_starter,
1732 this);
1733 if (pthread_status)
1734 {
1735 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1736 "New master listener thread could not be created: " << strerror(pthread_status));
1737 }
1738 else
1739 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1740
1741
1742 // define max latency for thread reaction on termination/stop signal
1743 timespec wait_interval= { 0, 250000000L }; // 250ms
1744 message* internal_thread_msg = NULL;
1745 state_t currstate= get_state();
1746
1747 // check whether this thread is signaled for termination
1748 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1749 {
1750 // poll internal message queue (blocking)
1751 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1752 {
1753 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1754 if (internalmsg)
1755 {
1756 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1757 {
1758 // a receiver thread terminated and signaled for cleanup by master thread
1759 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1760 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1761 lock();
1762 cleanup_receiver_thread( assocd );
1763 unlock();
1764 }
1765 else
1766 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1767 {
1768 // start a new receiver thread
1769 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1770 }
1771 else
1772 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1773
1774 delete internalmsg;
1775 }
1776 else
1777 {
1778 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1779 << internal_thread_msg->get_source());
1780 }
1781 } // endif
1782
1783 // get thread state
1784 currstate= get_state();
1785 } // end while
1786
1787 if (currstate==STATE_STOP)
1788 {
1789 // start abort actions
1790 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1791 } // end if stopped
1792
1793 // do not accept any more messages
1794 fq->shutdown();
1795 // terminate all receiver and sender threads that are still active
1796 terminate_all_threads();
1797}
1798
1799} // end namespace protlib
1800///@}
Note: See TracBrowser for help on using the repository browser.