source: source/ariba/communication/modules/transport/protlib/tp_over_tcp.cpp@ 5638

Last change on this file since 5638 was 5638, checked in by Christoph Mayer, 15 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 53.4 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.cpp
3/// TCP-based transport module (includes framing support)
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_tcp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/in.h> /* network socket interface */
36#include <netinet/tcp.h> /* for TCP Socket Option */
37#include <sys/socket.h>
38#include <arpa/inet.h> /* inet_addr */
39
40#include <fcntl.h>
41#include <sys/poll.h>
42}
43
44#include <iostream>
45#include <errno.h>
46#include <string>
47#include <sstream>
48
49#include "tp_over_tcp.h"
50#include "threadsafe_db.h"
51#include "cleanuphandler.h"
52#include "setuid.h"
53#include "queuemanager.h"
54#include "logfile.h"
55
56#include <set>
57
58#define TCP_SUCCESS 0
59#define TCP_SEND_FAILURE 1
60
61const unsigned int max_listen_queue_size= 10;
62
63namespace protlib {
64
65using namespace log;
66
67/** @defgroup tptcp TP over TCP
68 * @ingroup network
69 * @{
70 */
71
72char in6_addrstr[INET6_ADDRSTRLEN+1];
73
74
75/******* class TPoverTCP *******/
76
77
78/** get_connection_to() checks for already existing connections.
79 * If a connection exists, it returns "AssocData"
80 * and saves it in "connmap" for further use
81 * If no connection exists, a new connection to "addr"
82 * is created.
83 */
84AssocData*
85TPoverTCP::get_connection_to(const appladdress& addr)
86{
87 // get timeout
88 struct timespec ts;
89 get_time_of_day(ts);
90 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
91 if (ts.tv_nsec>=1000000000L)
92 {
93 ts.tv_sec += ts.tv_nsec / 1000000000L;
94 ts.tv_nsec= ts.tv_nsec % 1000000000L;
95 }
96
97 // create a new AssocData pointer, initialize it to NULL
98 AssocData* assoc= NULL;
99 int new_socket;
100 // loop until timeout is exceeded: TODO: check applicability of loop
101 do
102 {
103 // check for existing connections to addr
104 // start critical section
105 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
106 assoc= connmap.lookup(addr);
107 // end critical section
108 unlock(); // uninstall_cleanup(1);
109 if (assoc)
110 {
111 // If not shut down then use it, else abort, wait and check again.
112 if (!assoc->shutdown)
113 {
114 return assoc;
115 }
116 else
117 {
118 // TODO: connection is already in shutdown mode. What now?
119 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
120
121 return 0;
122 }
123 } //end __if (assoc)__
124 else
125 {
126 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
127 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
128 }
129
130 // no connection found, create a new one
131 new_socket = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
132 if (new_socket == -1)
133 {
134 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
135
136 return 0;
137 }
138
139 // Disable Nagle Algorithm, set (TCP_NODELAY)
140 int nodelayflag= 1;
141 int status= setsockopt(new_socket,
142 IPPROTO_TCP,
143 TCP_NODELAY,
144 &nodelayflag,
145 sizeof(nodelayflag));
146 if (status)
147 {
148 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
149 }
150
151 // Reuse ports, even if they are busy
152 int socketreuseflag= 1;
153 status= setsockopt(new_socket,
154 SOL_SOCKET,
155 SO_REUSEADDR,
156 (const char *) &socketreuseflag,
157 sizeof(socketreuseflag));
158 if (status)
159 {
160 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
161 }
162
163 struct sockaddr_in6 dest_address;
164 dest_address.sin6_flowinfo= 0;
165 dest_address.sin6_scope_id= 0;
166 addr.get_sockaddr(dest_address);
167
168 // connect the socket to the destination address
169 int connect_status = connect(new_socket,
170 reinterpret_cast<const struct sockaddr*>(&dest_address),
171 sizeof(dest_address));
172
173 // connects to the listening_port of the peer's masterthread
174 if (connect_status != 0)
175 {
176 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
177 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
178
179 return 0; // error: couldn't connect
180 }
181
182
183 struct sockaddr_in6 own_address;
184 socklen_t own_address_len= sizeof(own_address);
185 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
186
187
188
189 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
190 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
191 << " port #" << ntohs(own_address.sin6_port));
192
193
194
195
196 // create new AssocData record (will copy addr)
197 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
198
199 // if assoc could be successfully created, insert it into ConnectionMap
200 if (assoc)
201 {
202 bool insert_success= false;
203 // start critical section
204 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
205 // insert AssocData into connection map
206 insert_success= connmap.insert(assoc);
207 // end critical section
208 unlock(); // uninstall_cleanup(1);
209
210 if (insert_success == true)
211 {
212#ifdef _DEBUG
213 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
214 << " via socket " << new_socket);
215
216
217#endif
218
219 // notify master thread to start a receiver thread (i.e. send selfmessage)
220 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
221 if (newmsg)
222 {
223 newmsg->send_to(tpparam.source);
224 return assoc;
225 }
226 else
227 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
228 }
229 else
230 {
231 // delete data and abort
232 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
233 <<", port #" << addr.get_port() << " into connection map, aborting connection");
234
235 // abort connection, delete its AssocData
236 close (new_socket);
237 if (assoc)
238 {
239 delete assoc;
240 assoc= 0;
241 }
242 return assoc;
243 } // end else connmap.insert
244
245 } // end "if (assoc)"
246 }
247 while (wait_cond(ts)!=ETIMEDOUT);
248
249 return assoc;
250} //end get_connection_to
251
252
253/** terminates a signaling association/connection
254 * - if no connection exists, generate a warning
255 * - otherwise: generate internal msg to related receiver thread
256 */
257void
258TPoverTCP::terminate(const address& in_addr)
259{
260#ifndef _NO_LOGGING
261 const char *const thisproc="terminate() - ";
262#endif
263
264 appladdress* addr = NULL;
265 addr = dynamic_cast<appladdress*>(in_addr.copy());
266
267 if (!addr) return;
268
269 // Create a new AssocData-pointer
270 AssocData* assoc = NULL;
271
272 // check for existing connections to addr
273
274 // start critical section:
275 // please note if receiver_thread terminated already, the assoc data is not
276 // available anymore therefore we need a lock around cleanup_receiver_thread()
277
278 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
279 assoc= connmap.lookup(*addr);
280 if (assoc)
281 {
282 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
283 // If not shut down then use it, else abort, wait and check again.
284 if (!assoc->shutdown)
285 {
286 if (assoc->socketfd)
287 {
288 // disallow sending
289 if (shutdown(assoc->socketfd,SHUT_WR))
290 {
291 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
292 }
293 else
294 {
295 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
296 }
297 }
298 assoc->shutdown= true;
299 }
300 else
301 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
302
303 }
304 else
305 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
306
307 stop_receiver_thread(assoc);
308
309 // end critical section
310 unlock(); // uninstall_cleanup(1);
311
312 if (addr) delete addr;
313}
314
315
316/** generates and internal TPoverTCP message to send a NetMsg to the network
317 * - it is necessary to let a thread do this, because the caller
318 * may get blocked if the connect() or send() call hangs for a while
319 * - the sending thread will call TPoverTCP::tcpsend()
320 * - if no connection exists, creates a new one (unless use_existing_connection is true)
321 * @note the netmsg is deleted by the send() method when it is not used anymore
322 */
323void
324TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
325{
326 if (netmsg == NULL) {
327 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
328 return;
329 }
330
331 appladdress* addr = NULL;
332 addr= dynamic_cast<appladdress*>(in_addr.copy());
333
334 if (!addr)
335 {
336 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
337 return;
338 }
339
340 // lock due to sendermap access
341 lock();
342
343 // check for existing sender thread
344 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
345
346 FastQueue* destqueue= 0;
347
348 if (it == senderthread_queuemap.end())
349 { // no sender thread found so far
350
351 // if we already have an existing connection it is save to create a sender thread
352 // since get_connection_to() will not be invoked, so an existing connection will
353 // be used
354 const AssocData* assoc = connmap.lookup(*addr);
355
356 //DLog(tpparam.name,"send() - use_existing_connection:" << (use_existing_connection ? "true" : "false") << " assoc:" << assoc);
357
358 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
359 { // it is allowed to create a new connection for this thread
360 // create a new queue for sender thread
361 FastQueue* sender_thread_queue= new FastQueue;
362 create_new_sender_thread(sender_thread_queue);
363 // remember queue for later use
364
365 //pair<sender_thread_queuemap_t::iterator, bool> tmpinsiterator=
366 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
367
368 destqueue= sender_thread_queue;
369 }
370 }
371 else
372 { // we have a sender thread, use stored queue from map
373 destqueue= it->second;
374 }
375
376 unlock();
377
378 // send a send_data message to it (if we have a destination queue)
379 if (destqueue)
380 {
381 // both parameters will be freed after message was sent!
382
383 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
384 if (internalmsg)
385 {
386 // send the internal message to the sender thread queue
387 internalmsg->send(tpparam.source,destqueue);
388 }
389 }
390 else
391 {
392 if (!use_existing_connection)
393 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
394 else
395 {
396 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
397
398 }
399 // cannot send data, so we must drop it
400 delete netmsg;
401 }
402
403 if (addr) delete addr;
404}
405
406/** sends a NetMsg to the network.
407 *
408 * @param netmsg message to send
409 * @param addr transport endpoint address
410 *
411 * @note if no connection exists, creates a new one
412 * @note both parameters are deleted after the message was sent
413 */
414void
415TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
416{
417#ifndef _NO_LOGGING
418 const char *const thisproc="sender - ";
419#endif
420
421 // set result initially to success, set it to failure
422 // in places where these failures occur
423 int result = TCP_SUCCESS;
424 int saved_errno= 0;
425 int ret= 0;
426
427 // Create a new AssocData-pointer
428 AssocData* assoc = NULL;
429
430 // tp.cpp checks for initialisation of tp and correctness of
431 // msgsize, protocol and ip,
432 // throws an error if something is not right
433 if (addr) {
434 addr->convert_to_ipv6();
435 check_send_args(*netmsg,*addr);
436 }
437 else
438 {
439 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
440 result= TCP_SEND_FAILURE;
441
442 delete netmsg;
443 delete addr;
444
445 throw TPErrorInternal();
446 }
447
448
449 // check for existing connections,
450 // if a connection exists, return its AssocData
451 // and save it in assoc for further use
452 // if no connection exists, create a new one (in get_connection_to()).
453 // Connection is inserted into connection map that must be done
454 // with exclusive access
455 assoc= get_connection_to(*addr);
456
457 if (assoc==NULL || assoc->socketfd<=0)
458 {
459 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
460
461 delete netmsg;
462 delete addr;
463 return;
464 }
465
466 if (assoc->shutdown)
467 {
468 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
469 delete netmsg;
470 delete addr;
471
472 throw TPErrorSendFailed();
473 }
474
475 uint32 msgsize= netmsg->get_size();
476#ifdef DEBUG_HARD
477 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
478#endif
479
480 const uint32 retry_send_max = 3;
481 uint32 retry_count = 0;
482 // send all the data contained in netmsg to the socket
483 // which belongs to the address "addr"
484 for (uint32 bytes_sent= 0;
485 bytes_sent < msgsize;
486 bytes_sent+= ret)
487 {
488
489#ifdef _DEBUG_HARD
490 for (uint32 i=0;i<msgsize;i++)
491 {
492 cout << "send_buf: " << i << " : ";
493 if ( isalnum(*(netmsg->get_buffer()+i)) )
494 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
495 else
496 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
497 cout << endl;
498 }
499
500 cout << endl;
501 cout << "bytes_sent: " << bytes_sent << endl;
502 cout << "Message size: " << msgsize << endl;
503 cout << "Send-Socket: " << assoc->socketfd << endl;
504 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
505 cout << "vor send " << endl;
506#endif
507
508 retry_count= 0;
509 do
510 {
511 // socket send
512 ret= ::send(assoc->socketfd,
513 netmsg->get_buffer() + bytes_sent,
514 msgsize - bytes_sent,
515 MSG_NOSIGNAL);
516
517 // send_buf + bytes_sent
518
519 // Deal with temporary internal errors like EAGAIN, resource temporary unavailable etc.
520 // retry sending
521 if (ret < 0)
522 { // internal error during send occured
523 saved_errno= errno;
524 switch(saved_errno)
525 {
526 case EAGAIN: // same as EWOULDBLOCK
527 case EINTR: // man page says: A signal occurred before any data was transmitted.
528 case ENOBUFS: // The output queue for a network interface was full.
529 retry_count++;
530 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
531 << " - retry sending, retry #" << retry_count);
532 // pace down a little bit, sleep for a while
533 sleep(1);
534 break;
535
536 // everything else should not lead to repetition
537 default:
538 retry_count= retry_send_max;
539 break;
540 } // end switch
541 } // endif
542 else // leave while
543 break;
544 }
545 while(retry_count < retry_send_max);
546
547 if (ret < 0)
548 { // unrecoverable error occured
549
550 result= TCP_SEND_FAILURE;
551 break;
552 } // end if (ret < 0)
553 else
554 {
555 if (debug_pdu)
556 {
557 ostringstream hexdump;
558 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
559 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
560 }
561 }
562
563 // continue with send next data block in next for() iteration
564 } // end for
565
566 // *** note: netmsg is deleted here ***
567 delete netmsg;
568
569 // Throwing an exception within a critical section does not
570 // unlock the mutex.
571
572 if (result != TCP_SUCCESS)
573 {
574 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
575 delete addr;
576
577 throw TPErrorSendFailed(saved_errno);
578
579 }
580 else
581 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
582
583 if (!assoc) {
584 // no connection
585
586 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
587 << ", port #" << addr->get_port());
588
589 delete addr;
590
591 throw TPErrorUnreachable(); // should be no assoc found
592 } // end "if (!assoc)"
593
594 // *** delete address ***
595 delete addr;
596}
597
598/* this thread waits for an internal message that either:
599 * - requests transmission of a packet
600 * - requests to stop this thread
601 * @param argp points to the thread queue for internal messages
602 */
603void
604TPoverTCP::sender_thread(void *argp)
605{
606#ifndef _NO_LOGGING
607 const char *const methodname="senderthread - ";
608#endif
609
610 message* internal_thread_msg = NULL;
611
612 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
613
614 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
615 if (!fq)
616 {
617 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
618 return;
619 }
620
621 bool terminate= false;
622 TPoverTCPMsg* internalmsg= 0;
623 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
624 {
625 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
626
627 if (internalmsg == 0)
628 {
629 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
630 }
631 else
632 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
633 {
634 // create a connection if none exists and send the netmsg
635 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
636 {
637 try
638 {
639 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
640 } // end try
641 catch(TPErrorSendFailed& err)
642 {
643 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
644 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
645 } // end catch
646 catch(TPError& err)
647 {
648 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
649 } // end catch
650 catch(...)
651 {
652 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
653 }
654 }
655 else
656 {
657 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
658 }
659 }
660 else
661 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
662 {
663 terminate= true;
664 }
665 } // end while
666
667 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
668}
669
670
671/** receiver thread listens at a TCP socket for incoming PDUs
672 * and passes complete PDUs to the coordinator. Incomplete
673 * PDUs due to aborted connections or buffer overflows are discarded.
674 * @param argp - assoc data and flags sig_terminate and terminated
675 *
676 * @note this is a static member function, so you cannot use class variables
677 */
678void
679TPoverTCP::receiver_thread(void *argp)
680{
681#ifndef _NO_LOGGING
682 const char *const methodname="receiver - ";
683#endif
684
685 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
686 const appladdress* peer_addr = NULL;
687 const appladdress* own_addr = NULL;
688 uint32 bytes_received = 0;
689 TPMsg* tpmsg= NULL;
690
691 // argument parsing - start
692 if (receiver_thread_argp == 0)
693 {
694 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
695
696 return;
697 }
698 else
699 {
700 // change status to running, i.e., not terminated
701 receiver_thread_argp->terminated= false;
702
703#ifdef _DEBUG
704 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
705#endif
706 }
707
708 int conn_socket= 0;
709 if (receiver_thread_argp->peer_assoc)
710 {
711 // get socket descriptor from arg
712 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
713 // get pointer to peer address of socket (source/sender address of peer) from arg
714 peer_addr= &receiver_thread_argp->peer_assoc->peer;
715 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
716 }
717 else
718 {
719 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
720
721 return;
722 }
723
724 if (peer_addr == 0)
725 {
726 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
727
728 return;
729 }
730 // argument parsing - end
731#ifdef _DEBUG
732 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
733 "Preparing to wait for data at socket "
734 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
735#endif
736
737 int ret= 0;
738 uint32 msgcontentlength= 0;
739 bool msgcontentlength_known= false;
740 bool pdu_complete= false; // when to terminate inner loop
741
742 /* maybe use this to create a new pdu,
743 /// constructor
744 contextlistpdu(type_t t, subtype_t st, uint32 fc, uint32 numobj = 0);
745 */
746
747 // activate O_NON_BLOCK for recv on socket conn_socket
748 // CAVEAT: this also implies non-blocking send()!
749 //fcntl(conn_socket,F_SETFL, O_NONBLOCK);
750
751 // set options for polling
752 const unsigned int number_poll_sockets= 1;
753 struct pollfd poll_fd;
754 // have to set structure before poll call
755 poll_fd.fd = conn_socket;
756 poll_fd.events = POLLIN | POLLPRI;
757 poll_fd.revents = 0;
758
759 int poll_status;
760 bool recv_error= false;
761
762 NetMsg* netmsg= 0;
763 NetMsg* remainbuf= 0;
764 size_t buffer_bytes_left= 0;
765 size_t trailingbytes= 0;
766 bool skiprecv= false;
767 // loop until we receive a terminate signal (read-only var for this thread)
768 // or get an error from socket read
769 while( receiver_thread_argp->sig_terminate == false )
770 {
771 // Read next PDU from socket or process trailing bytes in remainbuf
772 ret= 0;
773 msgcontentlength= 0;
774 msgcontentlength_known= false;
775 pdu_complete= false;
776 netmsg= 0;
777
778 // there are trailing bytes left from the last receive call
779 if (remainbuf != 0)
780 {
781 netmsg= remainbuf;
782 remainbuf= 0;
783 buffer_bytes_left= netmsg->get_size()-trailingbytes;
784 bytes_received= trailingbytes;
785 trailingbytes= 0;
786 skiprecv= true;
787 }
788 else // no trailing bytes, create a new buffer
789 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
790 {
791 buffer_bytes_left= netmsg->get_size();
792 bytes_received= 0;
793 skiprecv= false;
794 }
795 else
796 { // buffer allocation failed
797 bytes_received= 0;
798 buffer_bytes_left= 0;
799 recv_error= true;
800 }
801
802 // loops until PDU is complete
803 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
804 while (!pdu_complete &&
805 !recv_error &&
806 !receiver_thread_argp->sig_terminate)
807 {
808 if (!skiprecv)
809 {
810 // read from TCP socket or return after sleep_time
811 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
812
813 if (receiver_thread_argp->sig_terminate)
814 {
815 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
816 // disallow sending
817 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
818 if (myassoc->shutdown == false)
819 {
820 myassoc->shutdown= true;
821 if (shutdown(myassoc->socketfd,SHUT_WR))
822 {
823 if ( errno != ENOTCONN )
824 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
825 }
826 }
827 // try to read do a last read from the TCP socket or return after sleep_time
828 if (poll_status == 0)
829 {
830 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
831 }
832 }
833
834 if (poll_fd.revents & POLLERR) // Error condition
835 {
836 if (errno == 0 || errno == EINTR)
837 {
838 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
839 }
840 else
841 {
842 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
843 recv_error= true;
844 }
845 }
846
847 if (poll_fd.revents & POLLHUP) // Hung up
848 {
849 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
850 recv_error= true;
851 }
852
853 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
854 {
855 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
856 recv_error= true;
857 }
858
859 // check status (return value) of poll call
860 switch (poll_status)
861 {
862 case -1:
863 if (errno == 0 || errno == EINTR)
864 {
865 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
866 }
867 else
868 {
869 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
870 recv_error= true;
871 }
872
873 continue; // next while iteration
874 break;
875
876 case 0:
877#ifdef DEBUG_HARD
878 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
879#endif
880 continue; // next while iteration
881 break;
882
883 default:
884#ifdef DEBUG_HARD
885 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
886#endif
887 break;
888 } // end switch
889
890
891 /// receive data from socket buffer (recv will not block)
892 ret = recv(conn_socket,
893 netmsg->get_buffer() + bytes_received,
894 buffer_bytes_left,
895 MSG_DONTWAIT);
896
897 if ( ret < 0 )
898 {
899 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
900 {
901 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
902 recv_error= true;
903 continue;
904 }
905 else
906 { // errno==EAGAIN || errno==EWOULDBLOCK
907 // just nothing to read from socket, continue w/ next poll
908 continue;
909 }
910 }
911 else
912 {
913 if (ret == 0)
914 {
915 // this means that EOF is reached,
916 // other side has closed connection
917 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
918 // disallow sending
919 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
920 if (myassoc->shutdown == false)
921 {
922 myassoc->shutdown= true;
923 if (shutdown(myassoc->socketfd,SHUT_WR))
924 {
925 if ( errno != ENOTCONN )
926 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
927 }
928 }
929 // not a real error, but we must quit the receive loop
930 recv_error= true;
931 }
932 else
933 {
934
935 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
936 // track number of received bytes
937 bytes_received+= ret;
938 buffer_bytes_left-= ret;
939 }
940 }
941 } // end if do not skip recv() statement
942
943 if (buffer_bytes_left < 0) ///< buffer space exhausted now
944 {
945 recv_error= true;
946 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
947 }
948
949 if (!msgcontentlength_known) ///< common header not parsed
950 {
951 // enough bytes read to parse common header?
952 if (bytes_received >= common_header_length)
953 {
954 // get message content length in number of bytes
955 if (getmsglength(*netmsg, msgcontentlength))
956 msgcontentlength_known= true;
957 else
958 {
959 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
960
961 ostringstream hexdumpstr;
962 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
963 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
964
965 // reset all counters
966 msgcontentlength= 0;
967 msgcontentlength_known= false;
968 bytes_received= 0;
969 pdu_complete= false;
970 continue;
971 }
972 }
973 } // endif common header not parsed
974
975 // check whether we have read the whole Protocol PDU
976 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
977 if (msgcontentlength_known)
978 {
979 if (bytes_received-common_header_length >= msgcontentlength )
980 {
981 pdu_complete= true;
982 // truncate buffer exactly at common_header_length+msgcontentlength==PDU size, trailing stuff is handled separately
983 netmsg->truncate(common_header_length+msgcontentlength);
984
985 // trailing bytes are copied into new buffer
986 if (bytes_received-common_header_length > msgcontentlength)
987 {
988 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
989 remainbuf= new NetMsg(NetMsg::max_size);
990 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
991 bytes_received= common_header_length+msgcontentlength;
992 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
993 }
994 }
995 else
996 { // not enough bytes in the buffer must continue to read from network
997 skiprecv= false;
998 }
999 } // endif msgcontentlength_known
1000 } // end while (!pdu_complete && !recv_error && !signalled for termination)
1001 // >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>
1002
1003 // if other side closed the connection, we should still be able to deliver the remaining data
1004 if (ret == 0)
1005 {
1006 recv_error= false;
1007 }
1008
1009 // deliver only complete PDUs to signaling module
1010 if (!recv_error && pdu_complete)
1011 {
1012 // create TPMsg and send it to the signaling thread
1013 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
1014 if (tpmsg)
1015 {
1016 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
1017 << " to module " << message::get_qaddr_name(tpparam.dest));
1018 }
1019
1020 debug_pdu=false;
1021
1022 if (debug_pdu)
1023 {
1024 ostringstream hexdump;
1025 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
1026 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
1027 }
1028
1029 // send the message if it was successfully created
1030 // bool message::send_to(qaddr_t dest, bool exp = false);
1031 if (!tpmsg
1032 || (!tpmsg->get_peeraddress())
1033 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
1034 {
1035 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
1036 if (tpmsg) delete tpmsg;
1037 } // end if tpmsg not allocated or not addr or not sent
1038
1039
1040 } // end if !recv_error
1041 else
1042 { // error during receive or PDU incomplete
1043 if (bytes_received>0)
1044 {
1045 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
1046 }
1047
1048 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
1049 {
1050 ostringstream hexdumpstr;
1051 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
1052 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
1053 }
1054 // leave the outer loop
1055 /**********************/
1056 break;
1057 /**********************/
1058 } // end else
1059
1060 } // end while (thread not signalled for termination)
1061
1062 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
1063 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
1064
1065 // shutdown socket
1066 if (shutdown(conn_socket, SHUT_RD))
1067 {
1068 if ( errno != ENOTCONN )
1069 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
1070 }
1071
1072 // close socket
1073 close(conn_socket);
1074
1075 receiver_thread_argp->terminated= true;
1076
1077 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
1078
1079#ifdef _DEBUG
1080 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
1081#endif
1082 // notify master thread for invocation of cleanup procedure
1083 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
1084 // send message to main loop thread
1085 newmsg->send_to(tpparam.source);
1086
1087}
1088
1089
1090/** this signals a terminate to a thread and wait for the thread to stop
1091 * @note it is not safe to access any thread related data after this method returned,
1092 * because the receiver thread will initiate a cleanup_receiver_thread() method
1093 * which may erase all relevant thread data.
1094 */
1095void
1096TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
1097{
1098 // All operations on recv_thread_argmap and connmap require an already acquired lock
1099 // after this procedure peer_assoc may be invalid because it was erased
1100
1101 // start critical section
1102
1103 if (peer_assoc == 0)
1104 return;
1105
1106 pthread_t thread_id= peer_assoc->thread_ID;
1107
1108 // try to clean up receiver_thread_arg
1109 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1110 receiver_thread_arg_t* recv_thread_argp=
1111 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1112 if (recv_thread_argp)
1113 {
1114 if (!recv_thread_argp->terminated)
1115 {
1116 // thread signaled termination, but is not?
1117 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
1118
1119 // signal thread for its termination
1120 recv_thread_argp->sig_terminate= true;
1121 // wait for thread to join after termination
1122 pthread_join(thread_id, 0);
1123 // the dying thread will signal main loop to call this method, but this time we should enter the else branch
1124 return;
1125 }
1126 }
1127 else
1128 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
1129
1130}
1131
1132
1133/** cleans up left over structures (assoc,receiver_thread_arg) from already terminated receiver_thread
1134 * usually called by the master_thread after the receiver_thread terminated
1135 * @note clean_up_receiver_thread() should be only called when an outer lock ensures that peer_assoc
1136 * is still valid
1137 */
1138void
1139TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
1140{
1141 // All operations on recv_thread_argmap and connmap require an already acquired lock
1142 // after this procedure peer_assoc may be invalid because it was erased
1143
1144 // start critical section
1145
1146 if (peer_assoc == 0)
1147 return;
1148
1149 pthread_t thread_id= peer_assoc->thread_ID;
1150
1151 // try to clean up receiver_thread_arg
1152 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
1153 receiver_thread_arg_t* recv_thread_argp=
1154 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
1155 if (recv_thread_argp)
1156 {
1157 if (!recv_thread_argp->terminated)
1158 {
1159 // thread signaled termination, but is not?
1160 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
1161 return;
1162 }
1163 else
1164 { // if thread is already terminated
1165 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
1166
1167 // delete it from receiver map
1168 recv_thread_argmap.erase(recv_thread_arg_iter);
1169
1170 // then delete receiver arg structure
1171 delete recv_thread_argp;
1172 }
1173 }
1174
1175 // delete entry from connection map
1176
1177 // cleanup sender thread
1178 // no need to lock explicitly, because caller of cleanup_receiver_thread() must already locked
1179 terminate_sender_thread(peer_assoc);
1180
1181 // delete the AssocData structure from the connection map
1182 // also frees allocated AssocData structure
1183 connmap.erase(peer_assoc);
1184
1185 // end critical section
1186
1187 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
1188}
1189
1190
1191/* sends a stop message to the sender thread that belongs to the peer address given in assoc
1192 * @note terminate_receiver_thread() should be only called when an outer lock ensures that assoc
1193 * is still valid, a lock is also required, because senderthread_queuemap is changed
1194 */
1195void
1196TPoverTCP::terminate_sender_thread(const AssocData* assoc)
1197{
1198 if (assoc == 0)
1199 {
1200 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
1201 return;
1202 }
1203
1204 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
1205
1206 if (it != senderthread_queuemap.end())
1207 { // we have a sender thread: send a stop message to it
1208 FastQueue* destqueue= it->second;
1209 if (destqueue)
1210 {
1211 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
1212 if (internalmsg)
1213 {
1214 // send the internal message to the sender thread queue
1215 internalmsg->send(tpparam.source,destqueue);
1216 }
1217 }
1218 else
1219 {
1220 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
1221 }
1222 // erase entry from map
1223 senderthread_queuemap.erase(it);
1224 }
1225}
1226
1227/* terminate all active threads
1228 * note: locking should not be necessary here because this message is called as last method from
1229 * main_loop()
1230 */
1231void
1232TPoverTCP::terminate_all_threads()
1233{
1234 AssocData* assoc= 0;
1235 receiver_thread_arg_t* terminate_argp;
1236
1237 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
1238 terminate_iterator != recv_thread_argmap.end();
1239 terminate_iterator++)
1240 {
1241 if ( (terminate_argp= terminate_iterator->second) != 0)
1242 {
1243 // we need a non const pointer to erase it later on
1244 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
1245 // check whether thread is still alive
1246 if (terminate_argp->terminated == false)
1247 {
1248 terminate_argp->sig_terminate= true;
1249 // then wait for its termination
1250 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1251 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
1252
1253 pthread_join(terminate_iterator->first, 0);
1254
1255 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
1256 }
1257 else
1258 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
1259 "Receiver thread <" << terminate_iterator->first << "> already terminated");
1260
1261 // cleanup all remaining argument structures of terminated threads
1262 delete terminate_argp;
1263
1264 // terminate any related sender thread that is still running
1265 terminate_sender_thread(assoc);
1266
1267 connmap.erase(assoc);
1268 // delete assoc is not necessary, because connmap.erase() will do the job
1269 }
1270 } // end for
1271}
1272
1273
1274/**
1275 * sender thread starter:
1276 * just a static starter method to allow starting the
1277 * actual sender_thread() method.
1278 *
1279 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1280 */
1281void*
1282TPoverTCP::sender_thread_starter(void *argp)
1283{
1284 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
1285
1286 //cout << "invoked sender_thread_Starter" << endl;
1287
1288 // invoke sender thread method
1289 if (sargp != 0 && sargp->instance != 0)
1290 {
1291 // call receiver_thread member function on object instance
1292 sargp->instance->sender_thread(sargp->sender_thread_queue);
1293
1294 //cout << "Before deletion of sarg" << endl;
1295
1296 // no longer needed
1297 delete sargp;
1298 }
1299 else
1300 {
1301 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
1302 }
1303 return 0;
1304}
1305
1306
1307
1308
1309/**
1310 * receiver thread starter:
1311 * just a static starter method to allow starting the
1312 * actual receiver_thread() method.
1313 *
1314 * @param argp - pointer to the current TPoverTCP object instance and receiver_thread_arg_t struct
1315 */
1316void*
1317TPoverTCP::receiver_thread_starter(void *argp)
1318{
1319 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
1320 // invoke receiver thread method
1321 if (rargp != 0 && rargp->instance != 0)
1322 {
1323 // call receiver_thread member function on object instance
1324 rargp->instance->receiver_thread(rargp->rtargp);
1325
1326 // no longer needed
1327 delete rargp;
1328 }
1329 else
1330 {
1331 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
1332 }
1333 return 0;
1334}
1335
1336
1337void
1338TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
1339{
1340 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
1341
1342 pthread_t senderthreadid;
1343 // create new thread; (arg == 0) is handled by thread, too
1344 int pthread_status= pthread_create(&senderthreadid,
1345 NULL, // NULL: default attributes: thread is joinable and has a
1346 // default, non-realtime scheduling policy
1347 TPoverTCP::sender_thread_starter,
1348 new sender_thread_start_arg_t(this,senderfqueue));
1349 if (pthread_status)
1350 {
1351 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1352
1353 delete senderfqueue;
1354 }
1355}
1356
1357
1358void
1359TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
1360{
1361 receiver_thread_arg_t* argp=
1362 new(nothrow) receiver_thread_arg(peer_assoc);
1363
1364 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
1365
1366 // create new thread; (arg == 0) is handled by thread, too
1367 int pthread_status= pthread_create(&peer_assoc->thread_ID,
1368 NULL, // NULL: default attributes: thread is joinable and has a
1369 // default, non-realtime scheduling policy
1370 receiver_thread_starter,
1371 new(nothrow) receiver_thread_start_arg_t(this,argp));
1372 if (pthread_status)
1373 {
1374 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
1375
1376 delete argp;
1377 }
1378 else
1379 {
1380 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1381
1382 // remember pointer to thread arg structure
1383 // thread arg structure should be destroyed after thread termination only
1384 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
1385 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
1386 if (tmpinsiterator.second == false)
1387 {
1388 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
1389 }
1390 unlock(); // uninstall_cleanup(1);
1391 }
1392}
1393
1394
1395/**
1396 * master listener thread starter:
1397 * just a static starter method to allow starting the
1398 * actual master_listener_thread() method.
1399 *
1400 * @param argp - pointer to the current TPoverTCP object instance
1401 */
1402void*
1403TPoverTCP::master_listener_thread_starter(void *argp)
1404{
1405 // invoke listener thread method
1406 if (argp != 0)
1407 {
1408 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
1409 }
1410 return 0;
1411}
1412
1413
1414
1415/**
1416 * master listener thread: waits for incoming connections at the well-known tcp port
1417 * when a connection request is received this thread spawns a receiver_thread for
1418 * receiving packets from the peer at the new socket.
1419 */
1420void
1421TPoverTCP::master_listener_thread()
1422{
1423 // create a new address-structure for the listening masterthread
1424 struct sockaddr_in6 own_address;
1425 own_address.sin6_family = AF_INET6;
1426 own_address.sin6_flowinfo= 0;
1427 own_address.sin6_port = htons(tpparam.port); // use port number in param structure
1428 // accept incoming connections on all interfaces
1429 own_address.sin6_addr = in6addr_any;
1430 own_address.sin6_scope_id= 0;
1431
1432 // create a listening socket
1433 int master_listener_socket= socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
1434 if (master_listener_socket == -1)
1435 {
1436 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
1437 return;
1438 }
1439
1440 // Disable Nagle Algorithm, set (TCP_NODELAY)
1441 int nodelayflag= 1;
1442 int status= setsockopt(master_listener_socket,
1443 IPPROTO_TCP,
1444 TCP_NODELAY,
1445 &nodelayflag,
1446 sizeof(nodelayflag));
1447 if (status)
1448 {
1449 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
1450 }
1451
1452 // Reuse ports, even if they are busy
1453 int socketreuseflag= 1;
1454 status= setsockopt(master_listener_socket,
1455 SOL_SOCKET,
1456 SO_REUSEADDR,
1457 (const char *) &socketreuseflag,
1458 sizeof(socketreuseflag));
1459 if (status)
1460 {
1461 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
1462 }
1463
1464
1465 // bind the newly created socket to a specific address
1466 int bind_status = bind(master_listener_socket,
1467 reinterpret_cast<struct sockaddr *>(&own_address),
1468 sizeof(own_address));
1469 if (bind_status)
1470 {
1471 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
1472 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN)
1473 << " port " << tpparam.port << " failed, error: " << strerror(errno));
1474 return;
1475 }
1476
1477 // listen at the socket,
1478 // queuesize for pending connections= max_listen_queue_size
1479 int listen_status = listen(master_listener_socket, max_listen_queue_size);
1480 if (listen_status)
1481 {
1482 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
1483 << " failed, error: " << strerror(errno));
1484 return;
1485 }
1486 else
1487 {
1488 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
1489 }
1490
1491 // activate O_NON_BLOCK for accept (accept does not block)
1492 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
1493
1494 // create a pollfd struct for use in the mainloop
1495 struct pollfd poll_fd;
1496 poll_fd.fd = master_listener_socket;
1497 poll_fd.events = POLLIN | POLLPRI;
1498 poll_fd.revents = 0;
1499 /*
1500 #define POLLIN 0x001 // There is data to read.
1501 #define POLLPRI 0x002 // There is urgent data to read.
1502 #define POLLOUT 0x004 // Writing now will not block.
1503 */
1504
1505 bool terminate = false;
1506 // check for thread terminate condition using get_state()
1507 state_t currstate= get_state();
1508 int poll_status= 0;
1509 const unsigned int number_poll_sockets= 1;
1510 struct sockaddr_in6 peer_address;
1511 socklen_t peer_address_len;
1512 int conn_socket;
1513
1514 // check whether this thread is signaled for termination
1515 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
1516 {
1517 // wait on number_poll_sockets (main drm socket)
1518 // for the events specified above for sleep_time (in ms)
1519 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
1520 if (poll_fd.revents & POLLERR) // Error condition
1521 {
1522 if (errno != EINTR)
1523 {
1524 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1525 "Poll caused error " << strerror(errno) << " - indicated by revents");
1526 }
1527 else
1528 {
1529 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
1530 }
1531
1532 }
1533 if (poll_fd.revents & POLLHUP) // Hung up
1534 {
1535 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
1536 return;
1537 }
1538 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
1539 {
1540 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
1541 return;
1542 }
1543
1544 switch (poll_status)
1545 {
1546 case -1:
1547 if (errno != EINTR)
1548 {
1549 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
1550 }
1551 else
1552 {
1553 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
1554 }
1555
1556 break;
1557
1558 case 0:
1559#ifdef DEBUG_HARD
1560 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
1561 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
1562#endif
1563 currstate= get_state();
1564 continue;
1565 break;
1566
1567 default:
1568#ifdef DEBUG_HARD
1569 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
1570#endif
1571 break;
1572 } // end switch
1573
1574 // after a successful accept call,
1575 // accept stores the address information of the connecting party
1576 // in peer_address and the size of its address in addrlen
1577 peer_address_len= sizeof(peer_address);
1578 conn_socket = accept (master_listener_socket,
1579 reinterpret_cast<struct sockaddr *>(&peer_address),
1580 &peer_address_len);
1581 if (conn_socket == -1)
1582 {
1583 if (errno != EWOULDBLOCK && errno != EAGAIN)
1584 {
1585 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
1586 << " failed, error: " << strerror(errno));
1587 return;
1588 }
1589 }
1590 else
1591 {
1592 // create a new assocdata-object for the new thread
1593 AssocData* peer_assoc = NULL;
1594 appladdress addr(peer_address, IPPROTO_TCP);
1595
1596 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
1597 << " port #" << addr.get_port());
1598
1599 struct sockaddr_in6 own_address;
1600 socklen_t own_address_len= sizeof(own_address);
1601 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
1602
1603 // AssocData will copy addr content into its own structure
1604 // allocated peer_assoc will be stored in connmap
1605 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
1606
1607 bool insert_success= false;
1608 if (peer_assoc)
1609 {
1610 // start critical section
1611 lock(); // install_cleanup_thread_lock(TPoverTCP, this);
1612 insert_success= connmap.insert(peer_assoc);
1613 // end critical section
1614 unlock(); // uninstall_cleanup(1);
1615 }
1616
1617
1618 if (insert_success == false) // not inserted into connmap
1619 {
1620 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
1621 << ", " << addr.get_ip_str() << ", port #"
1622 << addr.get_port() << " into connection map, aborting connection...");
1623
1624 // abort connection, delete its AssocData
1625 close (conn_socket);
1626 if (peer_assoc)
1627 {
1628 delete peer_assoc;
1629 peer_assoc= 0;
1630 }
1631 return;
1632
1633 } //end __else(connmap.insert());__
1634
1635 // create a new thread for each new connection
1636 create_new_receiver_thread(peer_assoc);
1637 } // end __else (connsocket)__
1638
1639 // get new thread state
1640 currstate= get_state();
1641
1642 } // end while(!terminate)
1643 return;
1644} // end listen_for_connections()
1645
1646
1647TPoverTCP::~TPoverTCP()
1648{
1649 init= false;
1650
1651 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
1652
1653 QueueManager::instance()->unregister_queue(tpparam.source);
1654}
1655
1656/** TPoverTCP Thread main loop.
1657 * This loop checks for internal messages of either
1658 * a send() call to start a new receiver thread, or,
1659 * of a receiver_thread() that signals its own termination
1660 * for proper cleanup of control structures.
1661 * It also handles the following internal TPoverTCPMsg types:
1662 * - TPoverTCPMsg::stop - a particular receiver thread is terminated
1663 * - TPoverTCPMsg::start - a particular receiver thread is started
1664 * @param nr number of current thread instance
1665 */
1666void
1667TPoverTCP::main_loop(uint32 nr)
1668{
1669
1670 // get internal queue for messages from receiver_thread
1671 FastQueue* fq = get_fqueue();
1672 if (!fq)
1673 {
1674 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
1675 return;
1676 } // end if not fq
1677 // register queue for receiving internal messages from other modules
1678 QueueManager::instance()->register_queue(fq,tpparam.source);
1679
1680 // start master listener thread
1681 pthread_t master_listener_thread_ID;
1682 int pthread_status= pthread_create(&master_listener_thread_ID,
1683 NULL, // NULL: default attributes: thread is joinable and has a
1684 // default, non-realtime scheduling policy
1685 master_listener_thread_starter,
1686 this);
1687 if (pthread_status)
1688 {
1689 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
1690 "New master listener thread could not be created: " << strerror(pthread_status));
1691 }
1692 else
1693 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
1694
1695
1696 // define max latency for thread reaction on termination/stop signal
1697 timespec wait_interval= { 0, 250000000L }; // 250ms
1698 message* internal_thread_msg = NULL;
1699 state_t currstate= get_state();
1700
1701 // check whether this thread is signaled for termination
1702 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
1703 {
1704 // poll internal message queue (blocking)
1705 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
1706 {
1707 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
1708 if (internalmsg)
1709 {
1710 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
1711 {
1712 // a receiver thread terminated and signaled for cleanup by master thread
1713 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
1714 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
1715 lock();
1716 cleanup_receiver_thread( assocd );
1717 unlock();
1718 }
1719 else
1720 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
1721 {
1722 // start a new receiver thread
1723 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
1724 }
1725 else
1726 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
1727
1728 delete internalmsg;
1729 }
1730 else
1731 {
1732 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
1733 << internal_thread_msg->get_source());
1734 }
1735 } // endif
1736
1737 // get thread state
1738 currstate= get_state();
1739 } // end while
1740
1741 if (currstate==STATE_STOP)
1742 {
1743 // start abort actions
1744 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
1745 } // end if stopped
1746
1747 // do not accept any more messages
1748 fq->shutdown();
1749 // terminate all receiver and sender threads that are still active
1750 terminate_all_threads();
1751}
1752
1753} // end namespace protlib
1754///@}
Note: See TracBrowser for help on using the repository browser.