source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.cpp@ 7040

Last change on this file since 7040 was 7040, checked in by mies, 14 years ago

fixed v4/v6 bug

File size: 55.6 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63#define IPV6_ADDR_INT32_SMP 0x0000ffff
64
65namespace protlib {
66
67void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
68 bzero(sin, sizeof(*sin));
69 sin->sin_family = AF_INET;
70 sin->sin_port = sin6->sin6_port;
71 memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
72}
73
74/* Convert sockaddr_in to sockaddr_in6 in v4 mapped addr format. */
75void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
76 bzero(sin6, sizeof(*sin6));
77 sin6->sin6_family = AF_INET6;
78 sin6->sin6_port = sin->sin_port;
79 *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
80 *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
81 *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
82 *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
83}
84
85using namespace log;
86
87/** @defgroup tptcp TP over TCP
88 * @ingroup network
89 * @{
90 */
91
92char in6_addrstr[INET6_ADDRSTRLEN+1];
93
94
95/******* class TPoverTCP *******/
96
97
98/** get_connection_to() checks for already existing connections.
99 * If a connection exists, it returns "AssocData"
100 * and saves it in "connmap" for further use
101 * If no connection exists, a new connection to "addr"
102 * is created.
103 */
104AssocData*
105TPoverTCP::get_connection_to(const appladdress& addr)
106{
107 // get timeout
108 struct timespec ts;
109 get_time_of_day(ts);
110 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
111 if (ts.tv_nsec>=1000000000L)
112 {
113 ts.tv_sec += ts.tv_nsec / 1000000000L;
114 ts.tv_nsec= ts.tv_nsec % 1000000000L;
115 }
116
117 // create a new AssocData pointer, initialize it to NULL
118 AssocData* assoc= NULL;
119 int new_socket;
120 // loop until timeout is exceeded: TODO: check applicability of loop
121 do
122 {
123 // check for existing connections to addr
124 // start critical section
125 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
126 assoc= connmap.lookup(addr);
127 // end critical section
128 unlock(); // uninstall_cleanup(1);
129 if (assoc)
130 {
131 // If not shut down then use it, else abort, wait and check again.
132 if (!assoc->shutdown)
133 {
134 return assoc;
135 }
136 else
137 {
138 // TODO: connection is already in shutdown mode. What now?
139 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
140
141 return 0;
142 }
143 } //end __if (assoc)__
144 else
145 {
146 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
147 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
148 }
149
150 // no connection found, create a new one
151 new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
152 if (new_socket == -1)
153 {
154 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
155
156 return 0;
157 }
158
159 // Disable Nagle Algorithm, set (TCP_NODELAY)
160 int nodelayflag= 1;
161 int status= setsockopt(new_socket,
162 IPPROTO_TCP,
163 TCP_NODELAY,
164 &nodelayflag,
165 sizeof(nodelayflag));
166 if (status)
167 {
168 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
169 }
170
171 // Reuse ports, even if they are busy
172 int socketreuseflag= 1;
173 status= setsockopt(new_socket,
174 SOL_SOCKET,
175 SO_REUSEADDR,
176 (const char *) &socketreuseflag,
177 sizeof(socketreuseflag));
178 if (status)
179 {
180 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
181 }
182
183 struct sockaddr_in6 dest_address;
184 dest_address.sin6_flowinfo= 0;
185 dest_address.sin6_scope_id= 0;
186 addr.get_sockaddr(dest_address);
187
188 // connect the socket to the destination address
189 int connect_status = 0;
190 if (v4_mode) {
191 struct sockaddr_in dest_address_v4;
192 v6_to_v4( &dest_address_v4, &dest_address );
193 connect_status = connect(new_socket,
194 reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
195 sizeof(dest_address));
196 } else {
197 connect_status = connect(new_socket,
198 reinterpret_cast<const struct sockaddr*>(&dest_address),
199 sizeof(dest_address));
200 }
201
202 // connects to the listening_port of the peer's masterthread
203 if (connect_status != 0)
204 {
205 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
206 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
207
208 return 0; // error: couldn't connect
209 }
210
211
212 struct sockaddr_in6 own_address;
213 if (v4_mode) {
214 struct sockaddr_in own_address_v4;
215 socklen_t own_address_len_v4 = sizeof(own_address_v4);
216 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
217 v4_to_v6(&own_address_v4, &own_address);
218 } else {
219 socklen_t own_address_len= sizeof(own_address);
220 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
221 }
222
223 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
224 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
225 << " port #" << ntohs(own_address.sin6_port));
226
227
228 // create new AssocData record (will copy addr)
229 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
230
231 // if assoc could be successfully created, insert it into ConnectionMap
232 if (assoc)
233 {
234 bool insert_success= false;
235 // start critical section
236 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
237 // insert AssocData into connection map
238 insert_success= connmap.insert(assoc);
239 // end critical section
240 unlock(); // uninstall_cleanup(1);
241
242 if (insert_success == true)
243 {
244#ifdef _DEBUG
245 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
246 << " via socket " << new_socket);
247
248
249#endif
250
251 // notify master thread to start a receiver thread (i.e. send selfmessage)
252 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
253 if (newmsg)
254 {
255 newmsg->send_to(tpparam.source);
256 return assoc;
257 }
258 else
259 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
260 }
261 else
262 {
263 // delete data and abort
264 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
265 <<", port #" << addr.get_port() << " into connection map, aborting connection");
266
267 // abort connection, delete its AssocData
268 close (new_socket);
269 if (assoc)
270 {
271 delete assoc;
272 assoc= 0;
273 }
274 return assoc;
275 } // end else connmap.insert
276
277 } // end "if (assoc)"
278 }
279 while (wait_cond(ts)!=ETIMEDOUT);
280
281 return assoc;
282} //end get_connection_to
283
284
285/** terminates a signaling association/connection
286 * - if no connection exists, generate a warning
287 * - otherwise: generate internal msg to related receiver thread
288 */
289void
290TPoverTCP::terminate(const address& in_addr)
291{
292#ifndef _NO_LOGGING
293 const char *const thisproc="terminate() - ";
294#endif
295
296 appladdress* addr = NULL;
297 addr = dynamic_cast<appladdress*>(in_addr.copy());
298
299 if (!addr) return;
300
301 // Create a new AssocData-pointer
302 AssocData* assoc = NULL;
303
304 // check for existing connections to addr
305
306 // start critical section:
307 // please note if receiver_thread terminated already, the assoc data is not
308 // available anymore therefore we need a lock around cleanup_receiver_thread()
309
310 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
311 assoc= connmap.lookup(*addr);
312 if (assoc)
313 {
314 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
315 // If not shut down then use it, else abort, wait and check again.
316 if (!assoc->shutdown)
317 {
318 if (assoc->socketfd)
319 {
320 // disallow sending
321 if (shutdown(assoc->socketfd,SHUT_WR))
322 {
323 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
324 }
325 else
326 {
327 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
328 }
329 }
330 assoc->shutdown= true;
331 }
332 else
333 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
334
335 }
336 else
337 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
338
339 stop_receiver_thread(assoc);
340
341 // end critical section
342 unlock(); // uninstall_cleanup(1);
343
344 if (addr) delete addr;
345}
346
347
348/** generates and internal TPoverTCP message to send a NetMsg to the network
349 * - it is necessary to let a thread do this, because the caller
350 * may get blocked if the connect() or send() call hangs for a while
351 * - the sending thread will call TPoverTCP::tcpsend()
352 * - if no connection exists, creates a new one (unless use_existing_connection is true)
353 * @note the netmsg is deleted by the send() method when it is not used anymore
354 */
355void
356TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
357{
358 if (netmsg == NULL) {
359 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
360 return;
361 }
362
363 appladdress* addr = NULL;
364 addr= dynamic_cast<appladdress*>(in_addr.copy());
365
366 if (!addr)
367 {
368 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
369 return;
370 }
371
372 // lock due to sendermap access
373 lock();
374
375 // check for existing sender thread
376 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
377
378 FastQueue* destqueue= 0;
379
380 if (it == senderthread_queuemap.end())
381 { // no sender thread found so far
382
383 // if we already have an existing connection it is save to create a sender thread
384 // since get_connection_to() will not be invoked, so an existing connection will
385 // be used
386 const AssocData* assoc = connmap.lookup(*addr);
387
388 //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
389
390 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
391 { // it is allowed to create a new connection for this thread
392 // create a new queue for sender thread
393 FastQueue* sender_thread_queue= new FastQueue;
394 create_new_sender_thread(sender_thread_queue);
395 // remember queue for later use
396
397 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
398 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
399
400 destqueue= sender_thread_queue;
401 }
402 }
403 else
404 { // we have a sender thread, use stored queue from map
405 destqueue= it->second;
406 }
407
408 unlock();
409
410 // send a send_data message to it (if we have a destination queue)
411 if (destqueue)
412 {
413 // both parameters will be freed after message was sent!
414
415 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
416 if (internalmsg)
417 {
418 // send the internal message to the sender thread queue
419 internalmsg->send(tpparam.source,destqueue);
420 }
421 }
422 else
423 {
424 if (!use_existing_connection)
425 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
426 else
427 {
428 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
429
430 }
431 // cannot send data, so we must drop it
432 delete netmsg;
433 }
434
435 if (addr) delete addr;
436}
437
438/** sends a NetMsg to the network.
439 *
440 * @param netmsg message to send
441 * @param addr transport endpoint address
442 *
443 * @note if no connection exists, creates a new one
444 * @note both parameters are deleted after the message was sent
445 */
446void
447TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
448{
449#ifndef _NO_LOGGING
450 const char *const thisproc="sender - ";
451#endif
452
453 // set result initially to success, set it to failure
454 // in places where these failures occur
455 int result = TCP_SUCCESS;
456 int saved_errno= 0;
457 int ret= 0;
458
459 // Create a new AssocData-pointer
460 AssocData* assoc = NULL;
461
462 // tp.cpp checks for initialisation of tp and correctness of
463 // msgsize, protocol and ip,
464 // throws an error if something is not right
465 if (addr) {
466 addr->convert_to_ipv6();
467 check_send_args(*netmsg,*addr);
468 }
469 else
470 {
471 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
472 result= TCP_SEND_FAILURE;
473
474 delete netmsg;
475 delete addr;
476
477 throw TPErrorInternal();
478 }
479
480
481 // check for existing connections,
482 // if a connection exists, return its AssocData
483 // and save it in assoc for further use
484 // if no connection exists, create a new one (in get_connection_to()).
485 // Connection is inserted into connection map that must be done
486 // with exclusive access
487 assoc= get_connection_to(*addr);
488
489 if (assoc==NULL || assoc->socketfd<=0)
490 {
491 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
492
493 delete netmsg;
494 delete addr;
495 return;
496 }
497
498 if (assoc->shutdown)
499 {
500 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
501 delete netmsg;
502 delete addr;
503
504 throw TPErrorSendFailed();
505 }
506
507 uint32 msgsize= netmsg->get_size();
508#ifdef DEBUG_HARD
509 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
510#endif
511
512 const uint32 retry_send_max = 3;
513 uint32 retry_count = 0;
514 // send all the data contained in netmsg to the socket
515 // which belongs to the address "addr"
516 for (uint32 bytes_sent= 0;
517 bytes_sent < msgsize;
518 bytes_sent+= ret)
519 {
520
521#ifdef _DEBUG_HARD
522 for (uint32 i=0;i<msgsize;i++)
523 {
524 cout << "send_buf: " << i << " : ";
525 if ( isalnum(*(netmsg->get_buffer()+i)) )
526 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
527 else
528 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
529 cout << endl;
530 }
531
532 cout << endl;
533 cout << "bytes_sent: " << bytes_sent << endl;
534 cout << "Message size: " << msgsize << endl;
535 cout << "Send-Socket: " << assoc->socketfd << endl;
536 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
537 cout << "vor send " << endl;
538#endif
539
540 retry_count= 0;
541 do
542 {
543 // socket send
544 ret= ::send(assoc->socketfd,
545 netmsg->get_buffer() + bytes_sent,
546 msgsize - bytes_sent,
547 MSG_NOSIGNAL);
548
549 // send_buf + bytes_sent
550
551 // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
552 // retry sending
553 if (ret < 0)
554 { // internal error during send occured
555 saved_errno= errno;
556 switch(saved_errno)
557 {
558 case EAGAIN: // same as EWOULDBLOCK
559 case EINTR: // man page says: A signal occurred before any data was transmitted.
560 case ENOBUFS: // The output queue for a network interface was full.
561 retry_count++;
562 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
563 << " - retry sending, retry #" << retry_count);
564 // pace down a little bit, sleep for a while
565 sleep(1);
566 break;
567
568 // everything else should not lead to repetition
569 default:
570 retry_count= retry_send_max;
571 break;
572 } // end switch
573 } // endif
574 else // leave while
575 break;
576 }
577 while(retry_count < retry_send_max);
578
579 if (ret < 0)
580 { // unrecoverable error occured
581
582 result= TCP_SEND_FAILURE;
583 break;
584 } // end if (ret < 0)
585 else
586 {
587 if (debug_pdu)
588 {
589 ostringstream hexdump;
590 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
591 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
592 }
593 }
594
595 // continue with send next data block in next for() iteration
596 } // end for
597
598 // *** note: netmsg is deleted here ***
599 delete netmsg;
600
601 // Throwing an exception within a critical section does not
602 // unlock the mutex.
603
604 if (result != TCP_SUCCESS)
605 {
606 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
607 delete addr;
608
609 throw TPErrorSendFailed(saved_errno);
610
611 }
612 else
613 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
614
615 if (!assoc) {
616 // no connection
617
618 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
619 << ", port #" << addr->get_port());
620
621 delete addr;
622
623 throw TPErrorUnreachable(); // should be no assoc found
624 } // end "if (!assoc)"
625
626 // *** delete address ***
627 delete addr;
628}
629
630/* this thread waits for an internal message that either:
631 * - requests transmission of a packet
632 * - requests to stop this thread
633 * @param argp points to the thread queue for internal messages
634 */
635void
636TPoverTCP::sender_thread(void *argp)
637{
638#ifndef _NO_LOGGING
639 const char *const methodname="senderthread - ";
640#endif
641
642 message* internal_thread_msg = NULL;
643
644 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
645
646 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
647 if (!fq)
648 {
649 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
650 return;
651 }
652
653 bool terminate= false;
654 TPoverTCPMsg* internalmsg= 0;
655 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
656 {
657 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
658
659 if (internalmsg == 0)
660 {
661 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
662 }
663 else
664 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
665 {
666 // create a connection if none exists and send the netmsg
667 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
668 {
669 try
670 {
671 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
672 } // end try
673 catch(TPErrorSendFailed& err)
674 {
675 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
676 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
677 } // end catch
678 catch(TPError& err)
679 {
680 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
681 } // end catch
682 catch(...)
683 {
684 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
685 }
686 }
687 else
688 {
689 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
690 }
691 }
692 else
693 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
694 {
695 terminate= true;
696 }
697 } // end while
698
699 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
700}
701
702
703/** receiver thread listens at a TCP socket for incoming PDUs
704 * and passes complete PDUs to the coordinator. Incomplete
705 * PDUs due to aborted connections or buffer overflows are discarded.
706 * @param argp - assoc data and flags sig_terminate and terminated
707 *
708 * @note this is a static member function, so you cannot use class variables
709 */
710void
711TPoverTCP::receiver_thread(void *argp)
712{
713#ifndef _NO_LOGGING
714 const char *const methodname="receiver - ";
715#endif
716
717 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
718 const appladdress* peer_addr = NULL;
719 const appladdress* own_addr = NULL;
720 uint32 bytes_received = 0;
721 TPMsg* tpmsg= NULL;
722
723 // argument parsing - start
724 if (receiver_thread_argp == 0)
725 {
726 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
727
728 return;
729 }
730 else
731 {
732 // change status to running, i.e., not terminated
733 receiver_thread_argp->terminated= false;
734
735#ifdef _DEBUG
736 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
737#endif
738 }
739
740 int conn_socket= 0;
741 if (receiver_thread_argp->peer_assoc)
742 {
743 // get socket descriptor from arg
744 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
745 // get pointer to peer address of socket (source/sender address of peer) from arg
746 peer_addr= &receiver_thread_argp->peer_assoc->peer;
747 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
748 }
749 else
750 {
751 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
752
753 return;
754 }
755
756 if (peer_addr == 0)
757 {
758 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
759
760 return;
761 }
762 // argument parsing - end
763#ifdef _DEBUG
764 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
765 "Preparing to wait for data at socket "
766 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
767#endif
768
769 int ret= 0;
770 uint32 msgcontentlength= 0;
771 bool msgcontentlength_known= false;
772 bool pdu_complete= false; // when to terminate inner loop
773
774 /* maybe use this to create a new pdu,
775 /// constructor
776 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
777 */
778
779 // activate O_NON_BLOCK for recv on socket conn_socket
780 // CAVEAT: this also implies non-blocking send()!
781 //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
782
783 // set options for polling
784 const unsigned int number_poll_sockets= 1;
785 struct pollfd poll_fd;
786 // have to set structure before poll call
787 poll_fd.fd = conn_socket;
788 poll_fd.events = POLLIN | POLLPRI;
789 poll_fd.revents = 0;
790
791 int poll_status;
792 bool recv_error= false;
793
794 NetMsg* netmsg= 0;
795 NetMsg* remainbuf= 0;
796 size_t buffer_bytes_left= 0;
797 size_t trailingbytes= 0;
798 bool skiprecv= false;
799 // loop until we receive a terminate signal (read-only var for this thread)
800 // or get an error from socket read
801 while( receiver_thread_argp->sig_terminate == false )
802 {
803 // Read next PDU from socket or process trailing bytes in remainbuf
804 ret= 0;
805 msgcontentlength= 0;
806 msgcontentlength_known= false;
807 pdu_complete= false;
808 netmsg= 0;
809
810 // there are trailing bytes left from the last receive call
811 if (remainbuf != 0)
812 {
813 netmsg= remainbuf;
814 remainbuf= 0;
815 buffer_bytes_left= netmsg->get_size()-trailingbytes;
816 bytes_received= trailingbytes;
817 trailingbytes= 0;
818 skiprecv= true;
819 }
820 else // no trailing bytes, create a new buffer
821 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
822 {
823 buffer_bytes_left= netmsg->get_size();
824 bytes_received= 0;
825 skiprecv= false;
826 }
827 else
828 { // buffer allocation failed
829 bytes_received= 0;
830 buffer_bytes_left= 0;
831 recv_error= true;
832 }
833
834 // loops until PDU is complete
835 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
836 while (!pdu_complete &&
837 !recv_error &&
838 !receiver_thread_argp->sig_terminate)
839 {
840 if (!skiprecv)
841 {
842 // read from TCP socket or return after sleep_time
843 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
844
845 if (receiver_thread_argp->sig_terminate)
846 {
847 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
848 // disallow sending
849 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
850 if (myassoc->shutdown == false)
851 {
852 myassoc->shutdown= true;
853 if (shutdown(myassoc->socketfd,SHUT_WR))
854 {
855 if ( errno != ENOTCONN )
856 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
857 }
858 }
859 // try to read do a last read from the TCP socket or return after sleep_time
860 if (poll_status == 0)
861 {
862 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
863 }
864 }
865
866 if (poll_fd.revents & POLLERR) // Error condition
867 {
868 if (errno == 0 || errno == EINTR)
869 {
870 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
871 }
872 else
873 {
874 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
875 recv_error= true;
876 }
877 }
878
879 if (poll_fd.revents & POLLHUP) // Hung up
880 {
881 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
882 recv_error= true;
883 }
884
885 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
886 {
887 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
888 recv_error= true;
889 }
890
891 // check status (return value) of poll call
892 switch (poll_status)
893 {
894 case -1:
895 if (errno == 0 || errno == EINTR)
896 {
897 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
898 }
899 else
900 {
901 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
902 recv_error= true;
903 }
904
905 continue; // next while iteration
906 break;
907
908 case 0:
909#ifdef DEBUG_HARD
910 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
911#endif
912 continue; // next while iteration
913 break;
914
915 default:
916#ifdef DEBUG_HARD
917 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
918#endif
919 break;
920 } // end switch
921
922
923 /// receive data from socket buffer (recv will not block)
924 ret = recv(conn_socket,
925 netmsg->get_buffer() + bytes_received,
926 buffer_bytes_left,
927 MSG_DONTWAIT);
928
929 if ( ret < 0 )
930 {
931 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
932 {
933 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
934 recv_error= true;
935 continue;
936 }
937 else
938 { // errno==EAGAIN || errno==EWOULDBLOCK
939 // just nothing to read from socket, continue w/ next poll
940 continue;
941 }
942 }
943 else
944 {
945 if (ret == 0)
946 {
947 // this means that EOF is reached,
948 // other side has closed connection
949 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
950 // disallow sending
951 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
952 if (myassoc->shutdown == false)
953 {
954 myassoc->shutdown= true;
955 if (shutdown(myassoc->socketfd,SHUT_WR))
956 {
957 if ( errno != ENOTCONN )
958 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
959 }
960 }
961 // not a real error, but we must quit the receive loop
962 recv_error= true;
963 }
964 else
965 {
966
967 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
968 // track number of received bytes
969 bytes_received+= ret;
970 buffer_bytes_left-= ret;
971 }
972 }
973 } // end if do not skip recv() statement
974
975 if (buffer_bytes_left < 0) ///< buffer space exhausted now
976 {
977 recv_error= true;
978 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
979 }
980
981 if (!msgcontentlength_known) ///< common header not parsed
982 {
983 // enough bytes read to parse common header?
984 if (bytes_received >= common_header_length)
985 {
986 // get message content length in number of bytes
987 if (getmsglength(*netmsg, msgcontentlength))
988 msgcontentlength_known= true;
989 else
990 {
991 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
992
993 ostringstream hexdumpstr;
994 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
995 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
996
997 // reset all counters
998 msgcontentlength= 0;
999 msgcontentlength_known= false;
1000 bytes_received= 0;
1001 pdu_complete= false;
1002 continue;
1003 }
1004 }
1005 } // endif common header not parsed
1006
1007 // check whether we have read the whole Protocol PDU
1008 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
1009 if (msgcontentlength_known)
1010 {
1011 if (bytes_received-common_header_length >= msgcontentlength )
1012 {
1013 pdu_complete= true;
1014 // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
1015 netmsg->truncate(common_header_length+msgcontentlength);
1016
1017 // trailing bytes are copied into new buffer
1018 if (bytes_received-common_header_length > msgcontentlength)
1019 {
1020 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
1021 remainbuf= new NetMsg(NetMsg::max_size);
1022 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
1023 bytes_received= common_header_length+msgcontentlength;
1024 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
1025 }
1026 }
1027 else
1028 { // not enough bytes in the buffer must continue to read from network
1029 skiprecv= false;
1030 }
1031 } // endif msgcontentlength_known
1032 } // end while (!pdu_complete && !recv_error && !signalled for termination)
1033 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1034
1035 // if other side closed the connection, we should still be able to deliver the remaining data
1036 if (ret == 0)
1037 {
1038 recv_error= false;
1039 }
1040
1041 // deliver only complete PDUs to signaling module
1042 if (!recv_error && pdu_complete)
1043 {
1044 // create TPMsg and send it to the signaling thread
1045 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1046 if (tpmsg)
1047 {
1048 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1049 << " to module " << message::get_qaddr_name(tpparam.dest));
1050 }
1051
1052 debug_pdu=false;
1053
1054 if (debug_pdu)
1055 {
1056 ostringstream hexdump;
1057 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1058 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1059 }
1060
1061 // send the message if it was successfully created
1062 // bool message::send_to(qaddr_t dest, bool exp = false);
1063 if (!tpmsg
1064 || (!tpmsg->get_peeraddress())
1065 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
1066 {
1067 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1068 if (tpmsg) delete tpmsg;
1069 } // end if tpmsg not allocated or not addr or not sent
1070
1071
1072 } // end if !recv_error
1073 else
1074 { // error during receive or PDU incomplete
1075 if (bytes_received>0)
1076 {
1077 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1078 }
1079
1080 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1081 {
1082 ostringstream hexdumpstr;
1083 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1084 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
1085 }
1086 // leave the outer loop
1087 /**********************/
1088 break;
1089 /**********************/
1090 } // end else
1091
1092 } // end while (thread not signalled for termination)
1093
1094 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
1095 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1096
1097 // shutdown socket
1098 if (shutdown(conn_socket, SHUT_RD))
1099 {
1100 if ( errno != ENOTCONN )
1101 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1102 }
1103
1104 // close socket
1105 close(conn_socket);
1106
1107 receiver_thread_argp->terminated= true;
1108
1109 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1110
1111#ifdef _DEBUG
1112 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1113#endif
1114 // notify master thread for invocation of cleanup procedure
1115 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1116 // send message to main loop thread
1117 newmsg->send_to(tpparam.source);
1118
1119}
1120
1121
1122/** this signals a terminate to a thread and wait for the thread to stop
1123 * @note it is not safe to access any thread related data after this method returned,
1124 * because the receiver thread will initiate a cleanup_receiver_thread() method
1125 * which may erase all relevant thread data.
1126 */
1127void
1128TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1129{
1130 // All operations on recv_thread_argmap and connmap require an already acquired lock
1131 // after this procedure peer_assoc may be invalid because it was erased
1132
1133 // start critical section
1134
1135 if (peer_assoc == 0)
1136 return;
1137
1138 pthread_t thread_id= peer_assoc->thread_ID;
1139
1140 // try to clean up receiver_thread_arg
1141 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1142 receiver_thread_arg_t* recv_thread_argp=
1143 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1144 if (recv_thread_argp)
1145 {
1146 if (!recv_thread_argp->terminated)
1147 {
1148 // thread signaled termination, but is not?
1149 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1150
1151 // signal thread for its termination
1152 recv_thread_argp->sig_terminate= true;
1153 // wait for thread to join after termination
1154 pthread_join(thread_id, 0);
1155 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1156 return;
1157 }
1158 }
1159 else
1160 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1161
1162}
1163
1164
1165/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1166 * usually called by the master_thread after the receiver_thread terminated
1167 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1168 * is still valid
1169 */
1170void
1171TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1172{
1173 // All operations on recv_thread_argmap and connmap require an already acquired lock
1174 // after this procedure peer_assoc may be invalid because it was erased
1175
1176 // start critical section
1177
1178 if (peer_assoc == 0)
1179 return;
1180
1181 pthread_t thread_id= peer_assoc->thread_ID;
1182
1183 // try to clean up receiver_thread_arg
1184 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1185 receiver_thread_arg_t* recv_thread_argp=
1186 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1187 if (recv_thread_argp)
1188 {
1189 if (!recv_thread_argp->terminated)
1190 {
1191 // thread signaled termination, but is not?
1192 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1193 return;
1194 }
1195 else
1196 { // if thread is already terminated
1197 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1198
1199 // delete it from receiver map
1200 recv_thread_argmap.erase(recv_thread_arg_iter);
1201
1202 // then delete receiver arg structure
1203 delete recv_thread_argp;
1204 }
1205 }
1206
1207 // delete entry from connection map
1208
1209 // cleanup sender thread
1210 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1211 terminate_sender_thread(peer_assoc);
1212
1213 // delete the AssocData structure from the connection map
1214 // also frees allocated AssocData structure
1215 connmap.erase(peer_assoc);
1216
1217 // end critical section
1218
1219 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1220}
1221
1222
1223/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1224 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1225 * is still valid, a lock is also required, because senderthread_queuemap is changed
1226 */
1227void
1228TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1229{
1230 if (assoc == 0)
1231 {
1232 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1233 return;
1234 }
1235
1236 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1237
1238 if (it != senderthread_queuemap.end())
1239 { // we have a sender thread: send a stop message to it
1240 FastQueue* destqueue= it->second;
1241 if (destqueue)
1242 {
1243 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1244 if (internalmsg)
1245 {
1246 // send the internal message to the sender thread queue
1247 internalmsg->send(tpparam.source,destqueue);
1248 }
1249 }
1250 else
1251 {
1252 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1253 }
1254 // erase entry from map
1255 senderthread_queuemap.erase(it);
1256 }
1257}
1258
1259/* terminate all active threads
1260 * note: locking should not be necessary here because this message is called as last method from
1261 * main_loop()
1262 */
1263void
1264TPoverTCP::terminate_all_threads()
1265{
1266 AssocData* assoc= 0;
1267 receiver_thread_arg_t* terminate_argp;
1268
1269 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1270 terminate_iterator != recv_thread_argmap.end();
1271 terminate_iterator++)
1272 {
1273 if ( (terminate_argp= terminate_iterator->second) != 0)
1274 {
1275 // we need a non const pointer to erase it later on
1276 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1277 // check whether thread is still alive
1278 if (terminate_argp->terminated == false)
1279 {
1280 terminate_argp->sig_terminate= true;
1281 // then wait for its termination
1282 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1283 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1284
1285 pthread_join(terminate_iterator->first, 0);
1286
1287 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1288 }
1289 else
1290 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1291 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1292
1293 // cleanup all remaining argument structures of terminated threads
1294 delete terminate_argp;
1295
1296 // terminate any related sender thread that is still running
1297 terminate_sender_thread(assoc);
1298
1299 connmap.erase(assoc);
1300 // delete assoc is not necessary, because connmap.erase() will do the job
1301 }
1302 } // end for
1303}
1304
1305
1306/**
1307 * sender thread starter:
1308 * just a static starter method to allow starting the
1309 * actual sender_thread() method.
1310 *
1311 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1312 */
1313void*
1314TPoverTCP::sender_thread_starter(void *argp)
1315{
1316 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1317
1318 //cout << "invoked sender_thread_Starter" << endl;
1319
1320 // invoke sender thread method
1321 if (sargp != 0 && sargp->instance != 0)
1322 {
1323 // call receiver_thread member function on object instance
1324 sargp->instance->sender_thread(sargp->sender_thread_queue);
1325
1326 //cout << "Before deletion of sarg" << endl;
1327
1328 // no longer needed
1329 delete sargp;
1330 }
1331 else
1332 {
1333 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1334 }
1335 return 0;
1336}
1337
1338
1339
1340
1341/**
1342 * receiver thread starter:
1343 * just a static starter method to allow starting the
1344 * actual receiver_thread() method.
1345 *
1346 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1347 */
1348void*
1349TPoverTCP::receiver_thread_starter(void *argp)
1350{
1351 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1352 // invoke receiver thread method
1353 if (rargp != 0 && rargp->instance != 0)
1354 {
1355 // call receiver_thread member function on object instance
1356 rargp->instance->receiver_thread(rargp->rtargp);
1357
1358 // no longer needed
1359 delete rargp;
1360 }
1361 else
1362 {
1363 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1364 }
1365 return 0;
1366}
1367
1368
1369void
1370TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1371{
1372 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1373
1374 pthread_t senderthreadid;
1375 // create new thread; (arg == 0) is handled by thread, too
1376 int pthread_status= pthread_create(&senderthreadid,
1377 NULL, // NULL: default attributes: thread is joinable and has a
1378 // default, non-realtime scheduling policy
1379 TPoverTCP::sender_thread_starter,
1380 new sender_thread_start_arg_t(this,senderfqueue));
1381 if (pthread_status)
1382 {
1383 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1384
1385 delete senderfqueue;
1386 }
1387}
1388
1389
1390void
1391TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1392{
1393 receiver_thread_arg_t* argp=
1394 new(nothrow) receiver_thread_arg(peer_assoc);
1395
1396 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1397
1398 // create new thread; (arg == 0) is handled by thread, too
1399 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1400 NULL, // NULL: default attributes: thread is joinable and has a
1401 // default, non-realtime scheduling policy
1402 receiver_thread_starter,
1403 new(nothrow) receiver_thread_start_arg_t(this,argp));
1404 if (pthread_status)
1405 {
1406 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1407
1408 delete argp;
1409 }
1410 else
1411 {
1412 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1413
1414 // remember pointer to thread arg structure
1415 // thread arg structure should be destroyed after thread termination only
1416 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1417 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1418 if (tmpinsiterator.second == false)
1419 {
1420 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1421 }
1422 unlock(); // uninstall_cleanup(1);
1423 }
1424}
1425
1426
1427/**
1428 * master listener thread starter:
1429 * just a static starter method to allow starting the
1430 * actual master_listener_thread() method.
1431 *
1432 * @param argp - pointer to the current TPoverTCP object instance
1433 */
1434void*
1435TPoverTCP::master_listener_thread_starter(void *argp)
1436{
1437 // invoke listener thread method
1438 if (argp != 0)
1439 {
1440 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1441 }
1442 return 0;
1443}
1444
1445
1446
1447/**
1448 * master listener thread: waits for incoming connections at the well-known tcp port
1449 * when a connection request is received this thread spawns a receiver_thread for
1450 * receiving packets from the peer at the new socket.
1451 */
1452void
1453TPoverTCP::master_listener_thread()
1454{
1455 // create a new address-structure for the listening masterthread
1456 struct sockaddr_in6 own_address_v6;
1457 own_address_v6.sin6_family = AF_INET6;
1458 own_address_v6.sin6_flowinfo= 0;
1459 own_address_v6.sin6_port = htons(tpparam.port); // use port number in param structure
1460 // accept incoming connections on all interfaces
1461 own_address_v6.sin6_addr = in6addr_any;
1462 own_address_v6.sin6_scope_id= 0;
1463
1464 // create a new address-structure for the listening masterthread
1465 struct sockaddr_in own_address_v4;
1466 own_address_v4.sin_family = AF_INET;
1467 own_address_v4.sin_port = htons(tpparam.port); // use port number in param structure
1468 // accept incoming connections on all interfaces
1469 own_address_v4.sin_addr.s_addr = INADDR_ANY;
1470
1471 // create a listening socket
1472 int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1473 if (master_listener_socket!=-1) v4_mode = false;
1474 if (master_listener_socket == -1) {
1475 master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
1476 if (master_listener_socket!=-1) v4_mode = true;
1477 }
1478 if (master_listener_socket == -1)
1479 {
1480 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1481 return;
1482 }
1483
1484 // Disable Nagle Algorithm, set (TCP_NODELAY)
1485 int nodelayflag= 1;
1486 int status= setsockopt(master_listener_socket,
1487 IPPROTO_TCP,
1488 TCP_NODELAY,
1489 &nodelayflag,
1490 sizeof(nodelayflag));
1491 if (status)
1492 {
1493 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1494 }
1495
1496 // Reuse ports, even if they are busy
1497 int socketreuseflag= 1;
1498 status= setsockopt(master_listener_socket,
1499 SOL_SOCKET,
1500 SO_REUSEADDR,
1501 (const char *) &socketreuseflag,
1502 sizeof(socketreuseflag));
1503 if (status)
1504 {
1505 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1506 }
1507
1508
1509 // bind the newly created socket to a specific address
1510 int bind_status = bind(master_listener_socket, v4_mode ?
1511 reinterpret_cast<struct sockaddr *>(&own_address_v4) :
1512 reinterpret_cast<struct sockaddr *>(&own_address_v6),
1513 v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
1514 if (bind_status)
1515 {
1516 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
1517 << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
1518 inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
1519 << " port " << tpparam.port << " failed, error: " << strerror(errno));
1520 return;
1521 }
1522
1523 // listen at the socket,
1524 // queuesize for pending connections= max_listen_queue_size
1525 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1526 if (listen_status)
1527 {
1528 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1529 << " failed, error: " << strerror(errno));
1530 return;
1531 }
1532 else
1533 {
1534 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1535 }
1536
1537 // activate O_NON_BLOCK for accept (accept does not block)
1538 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1539
1540 // create a pollfd struct for use in the mainloop
1541 struct pollfd poll_fd;
1542 poll_fd.fd = master_listener_socket;
1543 poll_fd.events = POLLIN | POLLPRI;
1544 poll_fd.revents = 0;
1545 /*
1546 #define POLLIN 0x001 // There is data to read.
1547 #define POLLPRI 0x002 // There is urgent data to read.
1548 #define POLLOUT 0x004 // Writing now will not block.
1549 */
1550
1551 bool terminate = false;
1552 // check for thread terminate condition using get_state()
1553 state_t currstate= get_state();
1554 int poll_status= 0;
1555 const unsigned int number_poll_sockets= 1;
1556 struct sockaddr_in6 peer_address;
1557 socklen_t peer_address_len;
1558 int conn_socket;
1559
1560 // check whether this thread is signaled for termination
1561 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1562 {
1563 // wait on number_poll_sockets (main drm socket)
1564 // for the events specified above for sleep_time (in ms)
1565 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1566 if (poll_fd.revents & POLLERR) // Error condition
1567 {
1568 if (errno != EINTR)
1569 {
1570 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1571 "Poll caused error " << strerror(errno) << " - indicated by revents");
1572 }
1573 else
1574 {
1575 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1576 }
1577
1578 }
1579 if (poll_fd.revents & POLLHUP) // Hung up
1580 {
1581 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1582 return;
1583 }
1584 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1585 {
1586 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1587 return;
1588 }
1589
1590 switch (poll_status)
1591 {
1592 case -1:
1593 if (errno != EINTR)
1594 {
1595 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1596 }
1597 else
1598 {
1599 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1600 }
1601
1602 break;
1603
1604 case 0:
1605#ifdef DEBUG_HARD
1606 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1607 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1608#endif
1609 currstate= get_state();
1610 continue;
1611 break;
1612
1613 default:
1614#ifdef DEBUG_HARD
1615 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1616#endif
1617 break;
1618 } // end switch
1619
1620 // after a successful accept call,
1621 // accept stores the address information of the connecting party
1622 // in peer_address and the size of its address in addrlen
1623 peer_address_len= sizeof(peer_address);
1624 conn_socket = accept (master_listener_socket,
1625 reinterpret_cast<struct sockaddr *>(&peer_address),
1626 &peer_address_len);
1627 if (conn_socket == -1)
1628 {
1629 if (errno != EWOULDBLOCK && errno != EAGAIN)
1630 {
1631 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1632 << " failed, error: " << strerror(errno));
1633 return;
1634 }
1635 }
1636 else
1637 {
1638 // create a new assocdata-object for the new thread
1639 AssocData* peer_assoc = NULL;
1640 appladdress addr(peer_address, IPPROTO_TCP);
1641
1642 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
1643 << " port #" << addr.get_port());
1644
1645 struct sockaddr_in6 own_address;
1646 if (v4_mode) {
1647 struct sockaddr_in own_address_v4;
1648 socklen_t own_address_len_v4 = sizeof(own_address_v4);
1649 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
1650 v4_to_v6(&own_address_v4, &own_address);
1651 } else {
1652 socklen_t own_address_len= sizeof(own_address);
1653 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1654 }
1655
1656 // AssocData will copy addr content into its own structure
1657 // allocated peer_assoc will be stored in connmap
1658 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1659
1660 bool insert_success= false;
1661 if (peer_assoc)
1662 {
1663 // start critical section
1664 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1665 insert_success= connmap.insert(peer_assoc);
1666 // end critical section
1667 unlock(); // uninstall_cleanup(1);
1668 }
1669
1670
1671 if (insert_success == false) // not inserted into connmap
1672 {
1673 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1674 << ", " << addr.get_ip_str() << ", port #"
1675 << addr.get_port() << " into connection map, aborting connection...");
1676
1677 // abort connection, delete its AssocData
1678 close (conn_socket);
1679 if (peer_assoc)
1680 {
1681 delete peer_assoc;
1682 peer_assoc= 0;
1683 }
1684 return;
1685
1686 } //end __else(connmap.insert());__
1687
1688 // create a new thread for each new connection
1689 create_new_receiver_thread(peer_assoc);
1690 } // end __else (connsocket)__
1691
1692 // get new thread state
1693 currstate= get_state();
1694
1695 } // end while(!terminate)
1696 return;
1697} // end listen_for_connections()
1698
1699
1700TPoverTCP::~TPoverTCP()
1701{
1702 init= false;
1703
1704 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1705
1706 QueueManager::instance()->unregister_queue(tpparam.source);
1707}
1708
1709/** TPoverTCP Thread main loop.
1710 * This loop checks for internal messages of either
1711 * a send() call to start a new receiver thread, or,
1712 * of a receiver_thread() that signals its own termination
1713 * for proper cleanup of control structures.
1714 * It also handles the following internal TPoverTCPMsg types:
1715 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1716 * - TPoverTCPMsg::start - a particular receiver thread is started
1717 * @param nr number of current thread instance
1718 */
1719void
1720TPoverTCP::main_loop(uint32 nr)
1721{
1722
1723 // get internal queue for messages from receiver_thread
1724 FastQueue* fq = get_fqueue();
1725 if (!fq)
1726 {
1727 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1728 return;
1729 } // end if not fq
1730 // register queue for receiving internal messages from other modules
1731 QueueManager::instance()->register_queue(fq,tpparam.source);
1732
1733 // start master listener thread
1734 pthread_t master_listener_thread_ID;
1735 int pthread_status= pthread_create(&master_listener_thread_ID,
1736 NULL, // NULL: default attributes: thread is joinable and has a
1737 // default, non-realtime scheduling policy
1738 master_listener_thread_starter,
1739 this);
1740 if (pthread_status)
1741 {
1742 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1743 "New master listener thread could not be created: " << strerror(pthread_status));
1744 }
1745 else
1746 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1747
1748
1749 // define max latency for thread reaction on termination/stop signal
1750 timespec wait_interval= { 0, 250000000L }; // 250ms
1751 message* internal_thread_msg = NULL;
1752 state_t currstate= get_state();
1753
1754 // check whether this thread is signaled for termination
1755 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1756 {
1757 // poll internal message queue (blocking)
1758 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1759 {
1760 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1761 if (internalmsg)
1762 {
1763 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1764 {
1765 // a receiver thread terminated and signaled for cleanup by master thread
1766 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1767 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1768 lock();
1769 cleanup_receiver_thread( assocd );
1770 unlock();
1771 }
1772 else
1773 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1774 {
1775 // start a new receiver thread
1776 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1777 }
1778 else
1779 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1780
1781 delete internalmsg;
1782 }
1783 else
1784 {
1785 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1786 << internal_thread_msg->get_source());
1787 }
1788 } // endif
1789
1790 // get thread state
1791 currstate= get_state();
1792 } // end while
1793
1794 if (currstate==STATE_STOP)
1795 {
1796 // start abort actions
1797 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1798 } // end if stopped
1799
1800 // do not accept any more messages
1801 fq->shutdown();
1802 // terminate all receiver and sender threads that are still active
1803 terminate_all_threads();
1804}
1805
1806} // end namespace protlib
1807///@}
Note: See TracBrowser for help on using the repository browser.