source: source/ariba/communication/modules/transport/protlib/tp_over_tcp.cpp@ 5529

Last change on this file since 5529 was 5529, checked in by huebsch, 15 years ago
File size: 53.4 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63namespace protlib {
64
65using namespace log;
66
67/** @defgroup tptcp TP over TCP
68 * @ingroup network
69 * @{
70 */
71
72char in6_addrstr[INET6_ADDRSTRLEN+1];
73
74
75/******* class TPoverTCP *******/
76
77
78/** get_connection_to() checks for already existing connections.
79 * If a connection exists, it returns "AssocData"
80 * and saves it in "connmap" for further use
81 * If no connection exists, a new connection to "addr"
82 * is created.
83 */
84AssocData*
85TPoverTCP::get_connection_to(const appladdress& addr)
86{
87 // get timeout
88 struct timespec ts;
89 get_time_of_day(ts);
90 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
91 if (ts.tv_nsec>=1000000000L)
92 {
93 ts.tv_sec += ts.tv_nsec / 1000000000L;
94 ts.tv_nsec= ts.tv_nsec % 1000000000L;
95 }
96
97 // create a new AssocData pointer, initialize it to NULL
98 AssocData* assoc= NULL;
99 int new_socket;
100 // loop until timeout is exceeded: TODO: check applicability of loop
101 do
102 {
103 // check for existing connections to addr
104 // start critical section
105 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
106 assoc= connmap.lookup(addr);
107 // end critical section
108 unlock(); // uninstall_cleanup(1);
109 if (assoc)
110 {
111 // If not shut down then use it, else abort, wait and check again.
112 if (!assoc->shutdown)
113 {
114 return assoc;
115 }
116 else
117 {
118 // TODO: connection is already in shutdown mode. What now?
119 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
120
121 return 0;
122 }
123 } //end __if (assoc)__
124 else
125 {
126 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
127 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
128 }
129
130 // no connection found, create a new one
131 new_socket = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
132 if (new_socket == -1)
133 {
134 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
135
136 return 0;
137 }
138
139 // Disable Nagle Algorithm, set (TCP_NODELAY)
140 int nodelayflag= 1;
141 int status= setsockopt(new_socket,
142 IPPROTO_TCP,
143 TCP_NODELAY,
144 &nodelayflag,
145 sizeof(nodelayflag));
146 if (status)
147 {
148 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
149 }
150
151 // Reuse ports, even if they are busy
152 int socketreuseflag= 1;
153 status= setsockopt(new_socket,
154 SOL_SOCKET,
155 SO_REUSEADDR,
156 (const char *) &socketreuseflag,
157 sizeof(socketreuseflag));
158 if (status)
159 {
160 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
161 }
162
163 struct sockaddr_in6 dest_address;
164 dest_address.sin6_flowinfo= 0;
165 dest_address.sin6_scope_id= 0;
166 addr.get_sockaddr(dest_address);
167
168 // connect the socket to the destination address
169 int connect_status = connect(new_socket,
170 reinterpret_cast<const struct sockaddr*>(&dest_address),
171 sizeof(dest_address));
172
173 // connects to the listening_port of the peer's masterthread
174 if (connect_status != 0)
175 {
176 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
177 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
178
179 return 0; // error: couldn't connect
180 }
181
182
183 struct sockaddr_in6 own_address;
184 socklen_t own_address_len= sizeof(own_address);
185 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
186
187
188
189 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
190 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
191 << " port #" << ntohs(own_address.sin6_port));
192
193
194
195
196 // create new AssocData record (will copy addr)
197 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
198
199 // if assoc could be successfully created, insert it into ConnectionMap
200 if (assoc)
201 {
202 bool insert_success= false;
203 // start critical section
204 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
205 // insert AssocData into connection map
206 insert_success= connmap.insert(assoc);
207 // end critical section
208 unlock(); // uninstall_cleanup(1);
209
210 if (insert_success == true)
211 {
212#ifdef _DEBUG
213 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
214 << " via socket " << new_socket);
215
216
217#endif
218
219 // notify master thread to start a receiver thread (i.e. send selfmessage)
220 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
221 if (newmsg)
222 {
223 newmsg->send_to(tpparam.source);
224 return assoc;
225 }
226 else
227 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
228 }
229 else
230 {
231 // delete data and abort
232 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
233 <<", port #" << addr.get_port() << " into connection map, aborting connection");
234
235 // abort connection, delete its AssocData
236 close (new_socket);
237 if (assoc)
238 {
239 delete assoc;
240 assoc= 0;
241 }
242 return assoc;
243 } // end else connmap.insert
244
245 } // end "if (assoc)"
246 }
247 while (wait_cond(ts)!=ETIMEDOUT);
248
249 return assoc;
250} //end get_connection_to
251
252
253/** terminates a signaling association/connection
254 * - if no connection exists, generate a warning
255 * - otherwise: generate internal msg to related receiver thread
256 */
257void
258TPoverTCP::terminate(const address& in_addr)
259{
260#ifndef _NO_LOGGING
261 const char *const thisproc="terminate() - ";
262#endif
263
264 appladdress* addr = NULL;
265 addr = dynamic_cast<appladdress*>(in_addr.copy());
266
267 if (!addr) return;
268
269 // Create a new AssocData-pointer
270 AssocData* assoc = NULL;
271
272 // check for existing connections to addr
273
274 // start critical section:
275 // please note if receiver_thread terminated already, the assoc data is not
276 // available anymore therefore we need a lock around cleanup_receiver_thread()
277
278 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
279 assoc= connmap.lookup(*addr);
280 if (assoc)
281 {
282 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
283 // If not shut down then use it, else abort, wait and check again.
284 if (!assoc->shutdown)
285 {
286 if (assoc->socketfd)
287 {
288 // disallow sending
289 if (shutdown(assoc->socketfd,SHUT_WR))
290 {
291 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
292 }
293 else
294 {
295 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
296 }
297 }
298 assoc->shutdown= true;
299 }
300 else
301 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
302
303 }
304 else
305 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
306
307 stop_receiver_thread(assoc);
308
309 // end critical section
310 unlock(); // uninstall_cleanup(1);
311
312 if (addr) delete addr;
313}
314
315
316/** generates and internal TPoverTCP message to send a NetMsg to the network
317 * - it is necessary to let a thread do this, because the caller
318 * may get blocked if the connect() or send() call hangs for a while
319 * - the sending thread will call TPoverTCP::tcpsend()
320 * - if no connection exists, creates a new one (unless use_existing_connection is true)
321 * @note the netmsg is deleted by the send() method when it is not used anymore
322 */
323void
324TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
325{
326 if (netmsg == NULL) {
327 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
328 return;
329 }
330
331 appladdress* addr = NULL;
332 addr= dynamic_cast<appladdress*>(in_addr.copy());
333
334 if (!addr)
335 {
336 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
337 return;
338 }
339
340 // lock due to sendermap access
341 lock();
342
343 // check for existing sender thread
344 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
345
346 FastQueue* destqueue= 0;
347
348 if (it == senderthread_queuemap.end())
349 { // no sender thread found so far
350
351 // if we already have an existing connection it is save to create a sender thread
352 // since get_connection_to() will not be invoked, so an existing connection will
353 // be used
354 const AssocData* assoc = connmap.lookup(*addr);
355
356 //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
357
358 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
359 { // it is allowed to create a new connection for this thread
360 // create a new queue for sender thread
361 FastQueue* sender_thread_queue= new FastQueue;
362 create_new_sender_thread(sender_thread_queue);
363 // remember queue for later use
364
365 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
366 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
367
368 destqueue= sender_thread_queue;
369 }
370 }
371 else
372 { // we have a sender thread, use stored queue from map
373 destqueue= it->second;
374 }
375
376 unlock();
377
378 // send a send_data message to it (if we have a destination queue)
379 if (destqueue)
380 {
381 // both parameters will be freed after message was sent!
382
383 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
384 if (internalmsg)
385 {
386 // send the internal message to the sender thread queue
387 internalmsg->send(tpparam.source,destqueue);
388 }
389 }
390 else
391 {
392 if (!use_existing_connection)
393 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
394 else
395 {
396 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
397
398 }
399 // cannot send data, so we must drop it
400 delete netmsg;
401 }
402
403 if (addr) delete addr;
404}
405
406/** sends a NetMsg to the network.
407 *
408 * @param netmsg message to send
409 * @param addr transport endpoint address
410 *
411 * @note if no connection exists, creates a new one
412 * @note both parameters are deleted after the message was sent
413 */
414void
415TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
416{
417#ifndef _NO_LOGGING
418 const char *const thisproc="sender - ";
419#endif
420
421 // set result initially to success, set it to failure
422 // in places where these failures occur
423 int result = TCP_SUCCESS;
424 int saved_errno= 0;
425 int ret= 0;
426
427 // Create a new AssocData-pointer
428 AssocData* assoc = NULL;
429
430 // tp.cpp checks for initialisation of tp and correctness of
431 // msgsize, protocol and ip,
432 // throws an error if something is not right
433 if (addr) {
434 addr->convert_to_ipv6();
435 check_send_args(*netmsg,*addr);
436 }
437 else
438 {
439 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
440 result= TCP_SEND_FAILURE;
441
442 delete netmsg;
443 delete addr;
444
445 throw TPErrorInternal();
446 }
447
448
449 // check for existing connections,
450 // if a connection exists, return its AssocData
451 // and save it in assoc for further use
452 // if no connection exists, create a new one (in get_connection_to()).
453 // Connection is inserted into connection map that must be done
454 // with exclusive access
455 assoc= get_connection_to(*addr);
456
457 if (assoc==NULL || assoc->socketfd<=0)
458 {
459 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
460
461 delete netmsg;
462 delete addr;
463 return;
464 }
465
466 if (assoc->shutdown)
467 {
468 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
469 delete netmsg;
470 delete addr;
471
472 throw TPErrorSendFailed();
473 }
474
475 uint32 msgsize= netmsg->get_size();
476#ifdef DEBUG_HARD
477 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
478#endif
479
480 const uint32 retry_send_max = 3;
481 uint32 retry_count = 0;
482 // send all the data contained in netmsg to the socket
483 // which belongs to the address "addr"
484 for (uint32 bytes_sent= 0;
485 bytes_sent < msgsize;
486 bytes_sent+= ret)
487 {
488
489#ifdef _DEBUG_HARD
490 for (uint32 i=0;i<msgsize;i++)
491 {
492 cout << "send_buf: " << i << " : ";
493 if ( isalnum(*(netmsg->get_buffer()+i)) )
494 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
495 else
496 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
497 cout << endl;
498 }
499
500 cout << endl;
501 cout << "bytes_sent: " << bytes_sent << endl;
502 cout << "Message size: " << msgsize << endl;
503 cout << "Send-Socket: " << assoc->socketfd << endl;
504 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
505 cout << "vor send " << endl;
506#endif
507
508 retry_count= 0;
509 do
510 {
511 // socket send
512 ret= ::send(assoc->socketfd,
513 netmsg->get_buffer() + bytes_sent,
514 msgsize - bytes_sent,
515 MSG_NOSIGNAL);
516
517 // send_buf + bytes_sent
518
519 // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
520 // retry sending
521 if (ret < 0)
522 { // internal error during send occured
523 saved_errno= errno;
524 switch(saved_errno)
525 {
526 case EAGAIN: // same as EWOULDBLOCK
527 case EINTR: // man page says: A signal occurred before any data was transmitted.
528 case ENOBUFS: // The output queue for a network interface was full.
529 retry_count++;
530 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
531 << " - retry sending, retry #" << retry_count);
532 // pace down a little bit, sleep for a while
533 sleep(1);
534 break;
535
536 // everything else should not lead to repetition
537 default:
538 retry_count= retry_send_max;
539 break;
540 } // end switch
541 } // endif
542 else // leave while
543 break;
544 }
545 while(retry_count < retry_send_max);
546
547 if (ret < 0)
548 { // unrecoverable error occured
549
550 result= TCP_SEND_FAILURE;
551 break;
552 } // end if (ret < 0)
553 else
554 {
555 if (debug_pdu)
556 {
557 ostringstream hexdump;
558 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
559 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
560 }
561 }
562
563 // continue with send next data block in next for() iteration
564 } // end for
565
566 // *** note: netmsg is deleted here ***
567 delete netmsg;
568
569 // Throwing an exception within a critical section does not
570 // unlock the mutex.
571
572 if (result != TCP_SUCCESS)
573 {
574 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
575 delete addr;
576
577 throw TPErrorSendFailed(saved_errno);
578
579 }
580 else
581 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
582
583 if (!assoc) {
584 // no connection
585
586 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
587 << ", port #" << addr->get_port());
588
589 delete addr;
590
591 throw TPErrorUnreachable(); // should be no assoc found
592 } // end "if (!assoc)"
593
594 // *** delete address ***
595 delete addr;
596}
597
598/* this thread waits for an internal message that either:
599 * - requests transmission of a packet
600 * - requests to stop this thread
601 * @param argp points to the thread queue for internal messages
602 */
603void
604TPoverTCP::sender_thread(void *argp)
605{
606#ifndef _NO_LOGGING
607 const char *const methodname="senderthread - ";
608#endif
609
610 message* internal_thread_msg = NULL;
611
612 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
613
614 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
615 if (!fq)
616 {
617 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
618 return;
619 }
620
621 bool terminate= false;
622 TPoverTCPMsg* internalmsg= 0;
623 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
624 {
625 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
626
627 if (internalmsg == 0)
628 {
629 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
630 }
631 else
632 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
633 {
634 // create a connection if none exists and send the netmsg
635 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
636 {
637 try
638 {
639 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
640 } // end try
641 catch(TPErrorSendFailed& err)
642 {
643 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
644 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
645 } // end catch
646 catch(TPError& err)
647 {
648 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
649 } // end catch
650 catch(...)
651 {
652 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
653 }
654 }
655 else
656 {
657 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
658 }
659 }
660 else
661 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
662 {
663 terminate= true;
664 }
665 } // end while
666
667 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
668}
669
670
671/** receiver thread listens at a TCP socket for incoming PDUs
672 * and passes complete PDUs to the coordinator. Incomplete
673 * PDUs due to aborted connections or buffer overflows are discarded.
674 * @param argp - assoc data and flags sig_terminate and terminated
675 *
676 * @note this is a static member function, so you cannot use class variables
677 */
678void
679TPoverTCP::receiver_thread(void *argp)
680{
681#ifndef _NO_LOGGING
682 const char *const methodname="receiver - ";
683#endif
684
685 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
686 const appladdress* peer_addr = NULL;
687 const appladdress* own_addr = NULL;
688 uint32 bytes_received = 0;
689 TPMsg* tpmsg= NULL;
690
691 // argument parsing - start
692 if (receiver_thread_argp == 0)
693 {
694 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
695
696 return;
697 }
698 else
699 {
700 // change status to running, i.e., not terminated
701 receiver_thread_argp->terminated= false;
702
703#ifdef _DEBUG
704 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
705#endif
706 }
707
708 int conn_socket= 0;
709 if (receiver_thread_argp->peer_assoc)
710 {
711 // get socket descriptor from arg
712 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
713 // get pointer to peer address of socket (source/sender address of peer) from arg
714 peer_addr= &receiver_thread_argp->peer_assoc->peer;
715 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
716 }
717 else
718 {
719 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
720
721 return;
722 }
723
724 if (peer_addr == 0)
725 {
726 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
727
728 return;
729 }
730 // argument parsing - end
731#ifdef _DEBUG
732 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
733 "Preparing to wait for data at socket "
734 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
735#endif
736
737 int ret= 0;
738 uint32 msgcontentlength= 0;
739 bool msgcontentlength_known= false;
740 bool pdu_complete= false; // when to terminate inner loop
741
742 /* maybe use this to create a new pdu,
743 /// constructor
744 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
745 */
746
747 // activate O_NON_BLOCK for recv on socket conn_socket
748 // CAVEAT: this also implies non-blocking send()!
749 //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
750
751 // set options for polling
752 const unsigned int number_poll_sockets= 1;
753 struct pollfd poll_fd;
754 // have to set structure before poll call
755 poll_fd.fd = conn_socket;
756 poll_fd.events = POLLIN | POLLPRI;
757 poll_fd.revents = 0;
758
759 int poll_status;
760 bool recv_error= false;
761
762 NetMsg* netmsg= 0;
763 NetMsg* remainbuf= 0;
764 size_t buffer_bytes_left= 0;
765 size_t trailingbytes= 0;
766 bool skiprecv= false;
767 // loop until we receive a terminate signal (read-only var for this thread)
768 // or get an error from socket read
769 while( receiver_thread_argp->sig_terminate == false )
770 {
771 // Read next PDU from socket or process trailing bytes in remainbuf
772 ret= 0;
773 msgcontentlength= 0;
774 msgcontentlength_known= false;
775 pdu_complete= false;
776 netmsg= 0;
777
778 // there are trailing bytes left from the last receive call
779 if (remainbuf != 0)
780 {
781 netmsg= remainbuf;
782 remainbuf= 0;
783 buffer_bytes_left= netmsg->get_size()-trailingbytes;
784 bytes_received= trailingbytes;
785 trailingbytes= 0;
786 skiprecv= true;
787 }
788 else // no trailing bytes, create a new buffer
789 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
790 {
791 buffer_bytes_left= netmsg->get_size();
792 bytes_received= 0;
793 skiprecv= false;
794 }
795 else
796 { // buffer allocation failed
797 bytes_received= 0;
798 buffer_bytes_left= 0;
799 recv_error= true;
800 }
801
802 // loops until PDU is complete
803 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
804 while (!pdu_complete &&
805 !recv_error &&
806 !receiver_thread_argp->sig_terminate)
807 {
808 if (!skiprecv)
809 {
810 // read from TCP socket or return after sleep_time
811 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
812
813 if (receiver_thread_argp->sig_terminate)
814 {
815 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
816 // disallow sending
817 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
818 if (myassoc->shutdown == false)
819 {
820 myassoc->shutdown= true;
821 if (shutdown(myassoc->socketfd,SHUT_WR))
822 {
823 if ( errno != ENOTCONN )
824 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
825 }
826 }
827 // try to read do a last read from the TCP socket or return after sleep_time
828 if (poll_status == 0)
829 {
830 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
831 }
832 }
833
834 if (poll_fd.revents & POLLERR) // Error condition
835 {
836 if (errno == 0 || errno == EINTR)
837 {
838 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
839 }
840 else
841 {
842 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
843 recv_error= true;
844 }
845 }
846
847 if (poll_fd.revents & POLLHUP) // Hung up
848 {
849 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
850 recv_error= true;
851 }
852
853 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
854 {
855 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
856 recv_error= true;
857 }
858
859 // check status (return value) of poll call
860 switch (poll_status)
861 {
862 case -1:
863 if (errno == 0 || errno == EINTR)
864 {
865 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
866 }
867 else
868 {
869 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
870 recv_error= true;
871 }
872
873 continue; // next while iteration
874 break;
875
876 case 0:
877#ifdef DEBUG_HARD
878 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
879#endif
880 continue; // next while iteration
881 break;
882
883 default:
884#ifdef DEBUG_HARD
885 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
886#endif
887 break;
888 } // end switch
889
890
891 /// receive data from socket buffer (recv will not block)
892 ret = recv(conn_socket,
893 netmsg->get_buffer() + bytes_received,
894 buffer_bytes_left,
895 MSG_DONTWAIT);
896
897 if ( ret < 0 )
898 {
899 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
900 {
901 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
902 recv_error= true;
903 continue;
904 }
905 else
906 { // errno==EAGAIN || errno==EWOULDBLOCK
907 // just nothing to read from socket, continue w/ next poll
908 continue;
909 }
910 }
911 else
912 {
913 if (ret == 0)
914 {
915 // this means that EOF is reached,
916 // other side has closed connection
917 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
918 // disallow sending
919 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
920 if (myassoc->shutdown == false)
921 {
922 myassoc->shutdown= true;
923 if (shutdown(myassoc->socketfd,SHUT_WR))
924 {
925 if ( errno != ENOTCONN )
926 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
927 }
928 }
929 // not a real error, but we must quit the receive loop
930 recv_error= true;
931 }
932 else
933 {
934
935 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
936 // track number of received bytes
937 bytes_received+= ret;
938 buffer_bytes_left-= ret;
939 }
940 }
941 } // end if do not skip recv() statement
942
943 if (buffer_bytes_left < 0) ///< buffer space exhausted now
944 {
945 recv_error= true;
946 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
947 }
948
949 if (!msgcontentlength_known) ///< common header not parsed
950 {
951 // enough bytes read to parse common header?
952 if (bytes_received >= common_header_length)
953 {
954 // get message content length in number of bytes
955 if (getmsglength(*netmsg, msgcontentlength))
956 msgcontentlength_known= true;
957 else
958 {
959 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
960
961 ostringstream hexdumpstr;
962 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
963 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
964
965 // reset all counters
966 msgcontentlength= 0;
967 msgcontentlength_known= false;
968 bytes_received= 0;
969 pdu_complete= false;
970 continue;
971 }
972 }
973 } // endif common header not parsed
974
975 // check whether we have read the whole Protocol PDU
976 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
977 if (msgcontentlength_known)
978 {
979 if (bytes_received-common_header_length >= msgcontentlength )
980 {
981 pdu_complete= true;
982 // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
983 netmsg->truncate(common_header_length+msgcontentlength);
984
985 // trailing bytes are copied into new buffer
986 if (bytes_received-common_header_length > msgcontentlength)
987 {
988 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
989 remainbuf= new NetMsg(NetMsg::max_size);
990 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
991 bytes_received= common_header_length+msgcontentlength;
992 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
993 }
994 }
995 else
996 { // not enough bytes in the buffer must continue to read from network
997 skiprecv= false;
998 }
999 } // endif msgcontentlength_known
1000 } // end while (!pdu_complete && !recv_error && !signalled for termination)
1001 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1002
1003 // if other side closed the connection, we should still be able to deliver the remaining data
1004 if (ret == 0)
1005 {
1006 recv_error= false;
1007 }
1008
1009 // deliver only complete PDUs to signaling module
1010 if (!recv_error && pdu_complete)
1011 {
1012 // create TPMsg and send it to the signaling thread
1013 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1014 if (tpmsg)
1015 {
1016 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1017 << " to module " << message::get_qaddr_name(tpparam.dest));
1018 }
1019
1020 debug_pdu=false;
1021
1022 if (debug_pdu)
1023 {
1024 ostringstream hexdump;
1025 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1026 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1027 }
1028
1029 // send the message if it was successfully created
1030 // bool message::send_to(qaddr_t dest, bool exp = false);
1031 if (!tpmsg
1032 || (!tpmsg->get_peeraddress())
1033 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
1034 {
1035 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1036 if (tpmsg) delete tpmsg;
1037 } // end if tpmsg not allocated or not addr or not sent
1038
1039
1040 } // end if !recv_error
1041 else
1042 { // error during receive or PDU incomplete
1043 if (bytes_received>0)
1044 {
1045 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1046 }
1047
1048 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1049 {
1050 ostringstream hexdumpstr;
1051 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1052 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
1053 }
1054 // leave the outer loop
1055 /**********************/
1056 break;
1057 /**********************/
1058 } // end else
1059
1060 } // end while (thread not signalled for termination)
1061
1062 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
1063 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1064
1065 // shutdown socket
1066 if (shutdown(conn_socket, SHUT_RD))
1067 {
1068 if ( errno != ENOTCONN )
1069 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1070 }
1071
1072 // close socket
1073 close(conn_socket);
1074
1075 receiver_thread_argp->terminated= true;
1076
1077 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1078
1079#ifdef _DEBUG
1080 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1081#endif
1082 // notify master thread for invocation of cleanup procedure
1083 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1084 // send message to main loop thread
1085 newmsg->send_to(tpparam.source);
1086
1087}
1088
1089
1090/** this signals a terminate to a thread and wait for the thread to stop
1091 * @note it is not safe to access any thread related data after this method returned,
1092 * because the receiver thread will initiate a cleanup_receiver_thread() method
1093 * which may erase all relevant thread data.
1094 */
1095void
1096TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1097{
1098 // All operations on recv_thread_argmap and connmap require an already acquired lock
1099 // after this procedure peer_assoc may be invalid because it was erased
1100
1101 // start critical section
1102
1103 if (peer_assoc == 0)
1104 return;
1105
1106 pthread_t thread_id= peer_assoc->thread_ID;
1107
1108 // try to clean up receiver_thread_arg
1109 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1110 receiver_thread_arg_t* recv_thread_argp=
1111 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1112 if (recv_thread_argp)
1113 {
1114 if (!recv_thread_argp->terminated)
1115 {
1116 // thread signaled termination, but is not?
1117 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1118
1119 // signal thread for its termination
1120 recv_thread_argp->sig_terminate= true;
1121 // wait for thread to join after termination
1122 pthread_join(thread_id, 0);
1123 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1124 return;
1125 }
1126 }
1127 else
1128 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1129
1130}
1131
1132
1133/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1134 * usually called by the master_thread after the receiver_thread terminated
1135 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1136 * is still valid
1137 */
1138void
1139TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1140{
1141 // All operations on recv_thread_argmap and connmap require an already acquired lock
1142 // after this procedure peer_assoc may be invalid because it was erased
1143
1144 // start critical section
1145
1146 if (peer_assoc == 0)
1147 return;
1148
1149 pthread_t thread_id= peer_assoc->thread_ID;
1150
1151 // try to clean up receiver_thread_arg
1152 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1153 receiver_thread_arg_t* recv_thread_argp=
1154 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1155 if (recv_thread_argp)
1156 {
1157 if (!recv_thread_argp->terminated)
1158 {
1159 // thread signaled termination, but is not?
1160 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1161 return;
1162 }
1163 else
1164 { // if thread is already terminated
1165 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1166
1167 // delete it from receiver map
1168 recv_thread_argmap.erase(recv_thread_arg_iter);
1169
1170 // then delete receiver arg structure
1171 delete recv_thread_argp;
1172 }
1173 }
1174
1175 // delete entry from connection map
1176
1177 // cleanup sender thread
1178 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1179 terminate_sender_thread(peer_assoc);
1180
1181 // delete the AssocData structure from the connection map
1182 // also frees allocated AssocData structure
1183 connmap.erase(peer_assoc);
1184
1185 // end critical section
1186
1187 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1188}
1189
1190
1191/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1192 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1193 * is still valid, a lock is also required, because senderthread_queuemap is changed
1194 */
1195void
1196TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1197{
1198 if (assoc == 0)
1199 {
1200 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1201 return;
1202 }
1203
1204 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1205
1206 if (it != senderthread_queuemap.end())
1207 { // we have a sender thread: send a stop message to it
1208 FastQueue* destqueue= it->second;
1209 if (destqueue)
1210 {
1211 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1212 if (internalmsg)
1213 {
1214 // send the internal message to the sender thread queue
1215 internalmsg->send(tpparam.source,destqueue);
1216 }
1217 }
1218 else
1219 {
1220 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1221 }
1222 // erase entry from map
1223 senderthread_queuemap.erase(it);
1224 }
1225}
1226
1227/* terminate all active threads
1228 * note: locking should not be necessary here because this message is called as last method from
1229 * main_loop()
1230 */
1231void
1232TPoverTCP::terminate_all_threads()
1233{
1234 AssocData* assoc= 0;
1235 receiver_thread_arg_t* terminate_argp;
1236
1237 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1238 terminate_iterator != recv_thread_argmap.end();
1239 terminate_iterator++)
1240 {
1241 if ( (terminate_argp= terminate_iterator->second) != 0)
1242 {
1243 // we need a non const pointer to erase it later on
1244 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1245 // check whether thread is still alive
1246 if (terminate_argp->terminated == false)
1247 {
1248 terminate_argp->sig_terminate= true;
1249 // then wait for its termination
1250 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1251 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1252
1253 pthread_join(terminate_iterator->first, 0);
1254
1255 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1256 }
1257 else
1258 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1259 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1260
1261 // cleanup all remaining argument structures of terminated threads
1262 delete terminate_argp;
1263
1264 // terminate any related sender thread that is still running
1265 terminate_sender_thread(assoc);
1266
1267 connmap.erase(assoc);
1268 // delete assoc is not necessary, because connmap.erase() will do the job
1269 }
1270 } // end for
1271}
1272
1273
1274/**
1275 * sender thread starter:
1276 * just a static starter method to allow starting the
1277 * actual sender_thread() method.
1278 *
1279 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1280 */
1281void*
1282TPoverTCP::sender_thread_starter(void *argp)
1283{
1284 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1285
1286 //cout << "invoked sender_thread_Starter" << endl;
1287
1288 // invoke sender thread method
1289 if (sargp != 0 && sargp->instance != 0)
1290 {
1291 // call receiver_thread member function on object instance
1292 sargp->instance->sender_thread(sargp->sender_thread_queue);
1293
1294 //cout << "Before deletion of sarg" << endl;
1295
1296 // no longer needed
1297 delete sargp;
1298 }
1299 else
1300 {
1301 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1302 }
1303 return 0;
1304}
1305
1306
1307
1308
1309/**
1310 * receiver thread starter:
1311 * just a static starter method to allow starting the
1312 * actual receiver_thread() method.
1313 *
1314 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1315 */
1316void*
1317TPoverTCP::receiver_thread_starter(void *argp)
1318{
1319 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1320 // invoke receiver thread method
1321 if (rargp != 0 && rargp->instance != 0)
1322 {
1323 // call receiver_thread member function on object instance
1324 rargp->instance->receiver_thread(rargp->rtargp);
1325
1326 // no longer needed
1327 delete rargp;
1328 }
1329 else
1330 {
1331 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1332 }
1333 return 0;
1334}
1335
1336
1337void
1338TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1339{
1340 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1341
1342 pthread_t senderthreadid;
1343 // create new thread; (arg == 0) is handled by thread, too
1344 int pthread_status= pthread_create(&senderthreadid,
1345 NULL, // NULL: default attributes: thread is joinable and has a
1346 // default, non-realtime scheduling policy
1347 TPoverTCP::sender_thread_starter,
1348 new sender_thread_start_arg_t(this,senderfqueue));
1349 if (pthread_status)
1350 {
1351 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1352
1353 delete senderfqueue;
1354 }
1355}
1356
1357
1358void
1359TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1360{
1361 receiver_thread_arg_t* argp=
1362 new(nothrow) receiver_thread_arg(peer_assoc);
1363
1364 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1365
1366 // create new thread; (arg == 0) is handled by thread, too
1367 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1368 NULL, // NULL: default attributes: thread is joinable and has a
1369 // default, non-realtime scheduling policy
1370 receiver_thread_starter,
1371 new(nothrow) receiver_thread_start_arg_t(this,argp));
1372 if (pthread_status)
1373 {
1374 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1375
1376 delete argp;
1377 }
1378 else
1379 {
1380 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1381
1382 // remember pointer to thread arg structure
1383 // thread arg structure should be destroyed after thread termination only
1384 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1385 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1386 if (tmpinsiterator.second == false)
1387 {
1388 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1389 }
1390 unlock(); // uninstall_cleanup(1);
1391 }
1392}
1393
1394
1395/**
1396 * master listener thread starter:
1397 * just a static starter method to allow starting the
1398 * actual master_listener_thread() method.
1399 *
1400 * @param argp - pointer to the current TPoverTCP object instance
1401 */
1402void*
1403TPoverTCP::master_listener_thread_starter(void *argp)
1404{
1405 // invoke listener thread method
1406 if (argp != 0)
1407 {
1408 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1409 }
1410 return 0;
1411}
1412
1413
1414
1415/**
1416 * master listener thread: waits for incoming connections at the well-known tcp port
1417 * when a connection request is received this thread spawns a receiver_thread for
1418 * receiving packets from the peer at the new socket.
1419 */
1420void
1421TPoverTCP::master_listener_thread()
1422{
1423 // create a new address-structure for the listening masterthread
1424 struct sockaddr_in6 own_address;
1425 own_address.sin6_family = AF_INET6;
1426 own_address.sin6_flowinfo= 0;
1427 own_address.sin6_port = htons(tpparam.port); // use port number in param structure
1428 // accept incoming connections on all interfaces
1429 own_address.sin6_addr = in6addr_any;
1430 own_address.sin6_scope_id= 0;
1431
1432 // create a listening socket
1433 int master_listener_socket= socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1434 if (master_listener_socket == -1)
1435 {
1436 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1437 return;
1438 }
1439
1440 // Disable Nagle Algorithm, set (TCP_NODELAY)
1441 int nodelayflag= 1;
1442 int status= setsockopt(master_listener_socket,
1443 IPPROTO_TCP,
1444 TCP_NODELAY,
1445 &nodelayflag,
1446 sizeof(nodelayflag));
1447 if (status)
1448 {
1449 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1450 }
1451
1452 // Reuse ports, even if they are busy
1453 int socketreuseflag= 1;
1454 status= setsockopt(master_listener_socket,
1455 SOL_SOCKET,
1456 SO_REUSEADDR,
1457 (const char *) &socketreuseflag,
1458 sizeof(socketreuseflag));
1459 if (status)
1460 {
1461 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1462 }
1463
1464
1465 // bind the newly created socket to a specific address
1466 int bind_status = bind(master_listener_socket,
1467 reinterpret_cast<struct sockaddr *>(&own_address),
1468 sizeof(own_address));
1469 if (bind_status)
1470 {
1471 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
1472 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN)
1473 << " port " << tpparam.port << " failed, error: " << strerror(errno));
1474 return;
1475 }
1476
1477 // listen at the socket,
1478 // queuesize for pending connections= max_listen_queue_size
1479 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1480 if (listen_status)
1481 {
1482 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1483 << " failed, error: " << strerror(errno));
1484 return;
1485 }
1486 else
1487 {
1488 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1489 }
1490
1491 // activate O_NON_BLOCK for accept (accept does not block)
1492 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1493
1494 // create a pollfd struct for use in the mainloop
1495 struct pollfd poll_fd;
1496 poll_fd.fd = master_listener_socket;
1497 poll_fd.events = POLLIN | POLLPRI;
1498 poll_fd.revents = 0;
1499 /*
1500 #define POLLIN 0x001 // There is data to read.
1501 #define POLLPRI 0x002 // There is urgent data to read.
1502 #define POLLOUT 0x004 // Writing now will not block.
1503 */
1504
1505 bool terminate = false;
1506 // check for thread terminate condition using get_state()
1507 state_t currstate= get_state();
1508 int poll_status= 0;
1509 const unsigned int number_poll_sockets= 1;
1510 struct sockaddr_in6 peer_address;
1511 socklen_t peer_address_len;
1512 int conn_socket;
1513
1514 // check whether this thread is signaled for termination
1515 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1516 {
1517 // wait on number_poll_sockets (main drm socket)
1518 // for the events specified above for sleep_time (in ms)
1519 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1520 if (poll_fd.revents & POLLERR) // Error condition
1521 {
1522 if (errno != EINTR)
1523 {
1524 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1525 "Poll caused error " << strerror(errno) << " - indicated by revents");
1526 }
1527 else
1528 {
1529 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1530 }
1531
1532 }
1533 if (poll_fd.revents & POLLHUP) // Hung up
1534 {
1535 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1536 return;
1537 }
1538 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1539 {
1540 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1541 return;
1542 }
1543
1544 switch (poll_status)
1545 {
1546 case -1:
1547 if (errno != EINTR)
1548 {
1549 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1550 }
1551 else
1552 {
1553 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1554 }
1555
1556 break;
1557
1558 case 0:
1559#ifdef DEBUG_HARD
1560 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1561 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1562#endif
1563 currstate= get_state();
1564 continue;
1565 break;
1566
1567 default:
1568#ifdef DEBUG_HARD
1569 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1570#endif
1571 break;
1572 } // end switch
1573
1574 // after a successful accept call,
1575 // accept stores the address information of the connecting party
1576 // in peer_address and the size of its address in addrlen
1577 peer_address_len= sizeof(peer_address);
1578 conn_socket = accept (master_listener_socket,
1579 reinterpret_cast<struct sockaddr *>(&peer_address),
1580 &peer_address_len);
1581 if (conn_socket == -1)
1582 {
1583 if (errno != EWOULDBLOCK && errno != EAGAIN)
1584 {
1585 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1586 << " failed, error: " << strerror(errno));
1587 return;
1588 }
1589 }
1590 else
1591 {
1592 // create a new assocdata-object for the new thread
1593 AssocData* peer_assoc = NULL;
1594 appladdress addr(peer_address, IPPROTO_TCP);
1595
1596 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
1597 << " port #" << addr.get_port());
1598
1599 struct sockaddr_in6 own_address;
1600 socklen_t own_address_len= sizeof(own_address);
1601 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1602
1603 // AssocData will copy addr content into its own structure
1604 // allocated peer_assoc will be stored in connmap
1605 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1606
1607 bool insert_success= false;
1608 if (peer_assoc)
1609 {
1610 // start critical section
1611 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1612 insert_success= connmap.insert(peer_assoc);
1613 // end critical section
1614 unlock(); // uninstall_cleanup(1);
1615 }
1616
1617
1618 if (insert_success == false) // not inserted into connmap
1619 {
1620 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1621 << ", " << addr.get_ip_str() << ", port #"
1622 << addr.get_port() << " into connection map, aborting connection...");
1623
1624 // abort connection, delete its AssocData
1625 close (conn_socket);
1626 if (peer_assoc)
1627 {
1628 delete peer_assoc;
1629 peer_assoc= 0;
1630 }
1631 return;
1632
1633 } //end __else(connmap.insert());__
1634
1635 // create a new thread for each new connection
1636 create_new_receiver_thread(peer_assoc);
1637 } // end __else (connsocket)__
1638
1639 // get new thread state
1640 currstate= get_state();
1641
1642 } // end while(!terminate)
1643 return;
1644} // end listen_for_connections()
1645
1646
1647TPoverTCP::~TPoverTCP()
1648{
1649 init= false;
1650
1651 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1652
1653 QueueManager::instance()->unregister_queue(tpparam.source);
1654}
1655
1656/** TPoverTCP Thread main loop.
1657 * This loop checks for internal messages of either
1658 * a send() call to start a new receiver thread, or,
1659 * of a receiver_thread() that signals its own termination
1660 * for proper cleanup of control structures.
1661 * It also handles the following internal TPoverTCPMsg types:
1662 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1663 * - TPoverTCPMsg::start - a particular receiver thread is started
1664 * @param nr number of current thread instance
1665 */
1666void
1667TPoverTCP::main_loop(uint32 nr)
1668{
1669
1670 // get internal queue for messages from receiver_thread
1671 FastQueue* fq = get_fqueue();
1672 if (!fq)
1673 {
1674 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1675 return;
1676 } // end if not fq
1677 // register queue for receiving internal messages from other modules
1678 QueueManager::instance()->register_queue(fq,tpparam.source);
1679
1680 // start master listener thread
1681 pthread_t master_listener_thread_ID;
1682 int pthread_status= pthread_create(&master_listener_thread_ID,
1683 NULL, // NULL: default attributes: thread is joinable and has a
1684 // default, non-realtime scheduling policy
1685 master_listener_thread_starter,
1686 this);
1687 if (pthread_status)
1688 {
1689 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1690 "New master listener thread could not be created: " << strerror(pthread_status));
1691 }
1692 else
1693 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1694
1695
1696 // define max latency for thread reaction on termination/stop signal
1697 timespec wait_interval= { 0, 250000000L }; // 250ms
1698 message* internal_thread_msg = NULL;
1699 state_t currstate= get_state();
1700
1701 // check whether this thread is signaled for termination
1702 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1703 {
1704 // poll internal message queue (blocking)
1705 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1706 {
1707 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1708 if (internalmsg)
1709 {
1710 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1711 {
1712 // a receiver thread terminated and signaled for cleanup by master thread
1713 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1714 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1715 lock();
1716 cleanup_receiver_thread( assocd );
1717 unlock();
1718 }
1719 else
1720 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1721 {
1722 // start a new receiver thread
1723 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1724 }
1725 else
1726 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1727
1728 delete internalmsg;
1729 }
1730 else
1731 {
1732 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1733 << internal_thread_msg->get_source());
1734 }
1735 } // endif
1736
1737 // get thread state
1738 currstate= get_state();
1739 } // end while
1740
1741 if (currstate==STATE_STOP)
1742 {
1743 // start abort actions
1744 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1745 } // end if stopped
1746
1747 // do not accept any more messages
1748 fq->shutdown();
1749 // terminate all receiver and sender threads that are still active
1750 terminate_all_threads();
1751}
1752
1753} // end namespace protlib
1754///@}
Note: See TracBrowser for help on using the repository browser.