00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 extern "C"
00031 {
00032
00033 #include <unistd.h>
00034 #include <sys/types.h>
00035 #include <netinet/in.h>
00036 #include <netinet/tcp.h>
00037 #include <sys/socket.h>
00038 #include <arpa/inet.h>
00039
00040 #include <fcntl.h>
00041 #include <sys/poll.h>
00042 }
00043
00044 #include <iostream>
00045 #include <errno.h>
00046 #include <string>
00047 #include <sstream>
00048
00049 #include "tp_over_tcp.h"
00050 #include "threadsafe_db.h"
00051 #include "cleanuphandler.h"
00052 #include "setuid.h"
00053 #include "queuemanager.h"
00054 #include "logfile.h"
00055
00056 #include <set>
00057
00058 #define TCP_SUCCESS 0
00059 #define TCP_SEND_FAILURE 1
00060
00061 const unsigned int max_listen_queue_size= 10;
00062
00063 namespace protlib {
00064
00065 using namespace log;
00066
00072 char in6_addrstr[INET6_ADDRSTRLEN+1];
00073
00074
00075
00076
00077
00084 AssocData*
00085 TPoverTCP::get_connection_to(const appladdress& addr)
00086 {
00087
00088 struct timespec ts;
00089 get_time_of_day(ts);
00090 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
00091 if (ts.tv_nsec>=1000000000L)
00092 {
00093 ts.tv_sec += ts.tv_nsec / 1000000000L;
00094 ts.tv_nsec= ts.tv_nsec % 1000000000L;
00095 }
00096
00097
00098 AssocData* assoc= NULL;
00099 int new_socket;
00100
00101 do
00102 {
00103
00104
00105 lock();
00106 assoc= connmap.lookup(addr);
00107
00108 unlock();
00109 if (assoc)
00110 {
00111
00112 if (!assoc->shutdown)
00113 {
00114 return assoc;
00115 }
00116 else
00117 {
00118
00119 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
00120
00121 return 0;
00122 }
00123 }
00124 else
00125 {
00126 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
00127 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
00128 }
00129
00130
00131 new_socket = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
00132 if (new_socket == -1)
00133 {
00134 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
00135
00136 return 0;
00137 }
00138
00139
00140 int nodelayflag= 1;
00141 int status= setsockopt(new_socket,
00142 IPPROTO_TCP,
00143 TCP_NODELAY,
00144 &nodelayflag,
00145 sizeof(nodelayflag));
00146 if (status)
00147 {
00148 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
00149 }
00150
00151
00152 int socketreuseflag= 1;
00153 status= setsockopt(new_socket,
00154 SOL_SOCKET,
00155 SO_REUSEADDR,
00156 (const char *) &socketreuseflag,
00157 sizeof(socketreuseflag));
00158 if (status)
00159 {
00160 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
00161 }
00162
00163 struct sockaddr_in6 dest_address;
00164 dest_address.sin6_flowinfo= 0;
00165 dest_address.sin6_scope_id= 0;
00166 addr.get_sockaddr(dest_address);
00167
00168
00169 int connect_status = connect(new_socket,
00170 reinterpret_cast<const struct sockaddr*>(&dest_address),
00171 sizeof(dest_address));
00172
00173
00174 if (connect_status != 0)
00175 {
00176 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
00177 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
00178
00179 return 0;
00180 }
00181
00182
00183 struct sockaddr_in6 own_address;
00184 socklen_t own_address_len= sizeof(own_address);
00185 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
00186
00187
00188
00189 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
00190 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
00191 << " port #" << ntohs(own_address.sin6_port));
00192
00193
00194
00195
00196
00197 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
00198
00199
00200 if (assoc)
00201 {
00202 bool insert_success= false;
00203
00204 lock();
00205
00206 insert_success= connmap.insert(assoc);
00207
00208 unlock();
00209
00210 if (insert_success == true)
00211 {
00212 #ifdef _DEBUG
00213 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
00214 << " via socket " << new_socket);
00215
00216
00217 #endif
00218
00219
00220 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
00221 if (newmsg)
00222 {
00223 newmsg->send_to(tpparam.source);
00224 return assoc;
00225 }
00226 else
00227 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
00228 }
00229 else
00230 {
00231
00232 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
00233 <<", port #" << addr.get_port() << " into connection map, aborting connection");
00234
00235
00236 close (new_socket);
00237 if (assoc)
00238 {
00239 delete assoc;
00240 assoc= 0;
00241 }
00242 return assoc;
00243 }
00244
00245 }
00246 }
00247 while (wait_cond(ts)!=ETIMEDOUT);
00248
00249 return assoc;
00250 }
00251
00252
00257 void
00258 TPoverTCP::terminate(const address& in_addr)
00259 {
00260 #ifndef _NO_LOGGING
00261 const char *const thisproc="terminate() - ";
00262 #endif
00263
00264 appladdress* addr = NULL;
00265 addr = dynamic_cast<appladdress*>(in_addr.copy());
00266
00267 if (!addr) return;
00268
00269
00270 AssocData* assoc = NULL;
00271
00272
00273
00274
00275
00276
00277
00278 lock();
00279 assoc= connmap.lookup(*addr);
00280 if (assoc)
00281 {
00282 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
00283
00284 if (!assoc->shutdown)
00285 {
00286 if (assoc->socketfd)
00287 {
00288
00289 if (shutdown(assoc->socketfd,SHUT_WR))
00290 {
00291 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
00292 }
00293 else
00294 {
00295 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
00296 }
00297 }
00298 assoc->shutdown= true;
00299 }
00300 else
00301 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
00302
00303 }
00304 else
00305 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
00306
00307 stop_receiver_thread(assoc);
00308
00309
00310 unlock();
00311
00312 if (addr) delete addr;
00313 }
00314
00315
00323 void
00324 TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
00325 {
00326 if (netmsg == NULL) {
00327 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
00328 return;
00329 }
00330
00331 appladdress* addr = NULL;
00332 addr= dynamic_cast<appladdress*>(in_addr.copy());
00333
00334 if (!addr)
00335 {
00336 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
00337 return;
00338 }
00339
00340
00341 lock();
00342
00343
00344 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
00345
00346 FastQueue* destqueue= 0;
00347
00348 if (it == senderthread_queuemap.end())
00349 {
00350
00351
00352
00353
00354 const AssocData* assoc = connmap.lookup(*addr);
00355
00356
00357
00358 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
00359 {
00360
00361 FastQueue* sender_thread_queue= new FastQueue;
00362 create_new_sender_thread(sender_thread_queue);
00363
00364
00365
00366 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
00367
00368 destqueue= sender_thread_queue;
00369 }
00370 }
00371 else
00372 {
00373 destqueue= it->second;
00374 }
00375
00376 unlock();
00377
00378
00379 if (destqueue)
00380 {
00381
00382
00383 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,new appladdress(*addr));
00384 if (internalmsg)
00385 {
00386
00387 internalmsg->send(tpparam.source,destqueue);
00388 }
00389 }
00390 else
00391 {
00392 if (!use_existing_connection)
00393 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
00394 else
00395 {
00396 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
00397
00398 }
00399
00400 delete netmsg;
00401 }
00402
00403 if (addr) delete addr;
00404 }
00405
00414 void
00415 TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
00416 {
00417 #ifndef _NO_LOGGING
00418 const char *const thisproc="sender - ";
00419 #endif
00420
00421
00422
00423 int result = TCP_SUCCESS;
00424 int saved_errno= 0;
00425 int ret= 0;
00426
00427
00428 AssocData* assoc = NULL;
00429
00430
00431
00432
00433 if (addr) {
00434 addr->convert_to_ipv6();
00435 check_send_args(*netmsg,*addr);
00436 }
00437 else
00438 {
00439 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
00440 result= TCP_SEND_FAILURE;
00441
00442 delete netmsg;
00443 delete addr;
00444
00445 throw TPErrorInternal();
00446 }
00447
00448
00449
00450
00451
00452
00453
00454
00455 assoc= get_connection_to(*addr);
00456
00457 if (assoc==NULL || assoc->socketfd<=0)
00458 {
00459 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
00460
00461 delete netmsg;
00462 delete addr;
00463 return;
00464 }
00465
00466 if (assoc->shutdown)
00467 {
00468 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
00469 delete netmsg;
00470 delete addr;
00471
00472 throw TPErrorSendFailed();
00473 }
00474
00475 uint32 msgsize= netmsg->get_size();
00476 #ifdef DEBUG_HARD
00477 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
00478 #endif
00479
00480 const uint32 retry_send_max = 3;
00481 uint32 retry_count = 0;
00482
00483
00484 for (uint32 bytes_sent= 0;
00485 bytes_sent < msgsize;
00486 bytes_sent+= ret)
00487 {
00488
00489 #ifdef _DEBUG_HARD
00490 for (uint32 i=0;i<msgsize;i++)
00491 {
00492 cout << "send_buf: " << i << " : ";
00493 if ( isalnum(*(netmsg->get_buffer()+i)) )
00494 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
00495 else
00496 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
00497 cout << endl;
00498 }
00499
00500 cout << endl;
00501 cout << "bytes_sent: " << bytes_sent << endl;
00502 cout << "Message size: " << msgsize << endl;
00503 cout << "Send-Socket: " << assoc->socketfd << endl;
00504 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
00505 cout << "vor send " << endl;
00506 #endif
00507
00508 retry_count= 0;
00509 do
00510 {
00511
00512 ret= ::send(assoc->socketfd,
00513 netmsg->get_buffer() + bytes_sent,
00514 msgsize - bytes_sent,
00515 MSG_NOSIGNAL);
00516
00517
00518
00519
00520
00521 if (ret < 0)
00522 {
00523 saved_errno= errno;
00524 switch(saved_errno)
00525 {
00526 case EAGAIN:
00527 case EINTR:
00528 case ENOBUFS:
00529 retry_count++;
00530 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
00531 << " - retry sending, retry #" << retry_count);
00532
00533 sleep(1);
00534 break;
00535
00536
00537 default:
00538 retry_count= retry_send_max;
00539 break;
00540 }
00541 }
00542 else
00543 break;
00544 }
00545 while(retry_count < retry_send_max);
00546
00547 if (ret < 0)
00548 {
00549
00550 result= TCP_SEND_FAILURE;
00551 break;
00552 }
00553 else
00554 {
00555 if (debug_pdu)
00556 {
00557 ostringstream hexdump;
00558 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
00559 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
00560 }
00561 }
00562
00563
00564 }
00565
00566
00567 delete netmsg;
00568
00569
00570
00571
00572 if (result != TCP_SUCCESS)
00573 {
00574 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
00575 delete addr;
00576
00577 throw TPErrorSendFailed(saved_errno);
00578
00579 }
00580 else
00581 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
00582
00583 if (!assoc) {
00584
00585
00586 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
00587 << ", port #" << addr->get_port());
00588
00589 delete addr;
00590
00591 throw TPErrorUnreachable();
00592 }
00593
00594
00595 delete addr;
00596 }
00597
00598
00599
00600
00601
00602
00603 void
00604 TPoverTCP::sender_thread(void *argp)
00605 {
00606 #ifndef _NO_LOGGING
00607 const char *const methodname="senderthread - ";
00608 #endif
00609
00610 message* internal_thread_msg = NULL;
00611
00612 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
00613
00614 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
00615 if (!fq)
00616 {
00617 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
00618 return;
00619 }
00620
00621 bool terminate= false;
00622 TPoverTCPMsg* internalmsg= 0;
00623 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
00624 {
00625 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
00626
00627 if (internalmsg == 0)
00628 {
00629 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
00630 }
00631 else
00632 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
00633 {
00634
00635 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
00636 {
00637 try
00638 {
00639 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
00640 }
00641 catch(TPErrorSendFailed& err)
00642 {
00643 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
00644 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
00645 }
00646 catch(TPError& err)
00647 {
00648 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
00649 }
00650 catch(...)
00651 {
00652 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
00653 }
00654 }
00655 else
00656 {
00657 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
00658 }
00659 }
00660 else
00661 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
00662 {
00663 terminate= true;
00664 }
00665 }
00666
00667 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
00668 }
00669
00670
00678 void
00679 TPoverTCP::receiver_thread(void *argp)
00680 {
00681 #ifndef _NO_LOGGING
00682 const char *const methodname="receiver - ";
00683 #endif
00684
00685 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
00686 const appladdress* peer_addr = NULL;
00687 const appladdress* own_addr = NULL;
00688 uint32 bytes_received = 0;
00689 TPMsg* tpmsg= NULL;
00690
00691
00692 if (receiver_thread_argp == 0)
00693 {
00694 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
00695
00696 return;
00697 }
00698 else
00699 {
00700
00701 receiver_thread_argp->terminated= false;
00702
00703 #ifdef _DEBUG
00704 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
00705 #endif
00706 }
00707
00708 int conn_socket= 0;
00709 if (receiver_thread_argp->peer_assoc)
00710 {
00711
00712 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
00713
00714 peer_addr= &receiver_thread_argp->peer_assoc->peer;
00715 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
00716 }
00717 else
00718 {
00719 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
00720
00721 return;
00722 }
00723
00724 if (peer_addr == 0)
00725 {
00726 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
00727
00728 return;
00729 }
00730
00731 #ifdef _DEBUG
00732 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
00733 "Preparing to wait for data at socket "
00734 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
00735 #endif
00736
00737 int ret= 0;
00738 uint32 msgcontentlength= 0;
00739 bool msgcontentlength_known= false;
00740 bool pdu_complete= false;
00741
00742
00744
00745
00746
00747
00748
00749
00750
00751
00752 const unsigned int number_poll_sockets= 1;
00753 struct pollfd poll_fd;
00754
00755 poll_fd.fd = conn_socket;
00756 poll_fd.events = POLLIN | POLLPRI;
00757 poll_fd.revents = 0;
00758
00759 int poll_status;
00760 bool recv_error= false;
00761
00762 NetMsg* netmsg= 0;
00763 NetMsg* remainbuf= 0;
00764 size_t buffer_bytes_left= 0;
00765 size_t trailingbytes= 0;
00766 bool skiprecv= false;
00767
00768
00769 while( receiver_thread_argp->sig_terminate == false )
00770 {
00771
00772 ret= 0;
00773 msgcontentlength= 0;
00774 msgcontentlength_known= false;
00775 pdu_complete= false;
00776 netmsg= 0;
00777
00778
00779 if (remainbuf != 0)
00780 {
00781 netmsg= remainbuf;
00782 remainbuf= 0;
00783 buffer_bytes_left= netmsg->get_size()-trailingbytes;
00784 bytes_received= trailingbytes;
00785 trailingbytes= 0;
00786 skiprecv= true;
00787 }
00788 else
00789 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
00790 {
00791 buffer_bytes_left= netmsg->get_size();
00792 bytes_received= 0;
00793 skiprecv= false;
00794 }
00795 else
00796 {
00797 bytes_received= 0;
00798 buffer_bytes_left= 0;
00799 recv_error= true;
00800 }
00801
00802
00803
00804 while (!pdu_complete &&
00805 !recv_error &&
00806 !receiver_thread_argp->sig_terminate)
00807 {
00808 if (!skiprecv)
00809 {
00810
00811 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
00812
00813 if (receiver_thread_argp->sig_terminate)
00814 {
00815 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
00816
00817 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
00818 if (myassoc->shutdown == false)
00819 {
00820 myassoc->shutdown= true;
00821 if (shutdown(myassoc->socketfd,SHUT_WR))
00822 {
00823 if ( errno != ENOTCONN )
00824 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
00825 }
00826 }
00827
00828 if (poll_status == 0)
00829 {
00830 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
00831 }
00832 }
00833
00834 if (poll_fd.revents & POLLERR)
00835 {
00836 if (errno == 0 || errno == EINTR)
00837 {
00838 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
00839 }
00840 else
00841 {
00842 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
00843 recv_error= true;
00844 }
00845 }
00846
00847 if (poll_fd.revents & POLLHUP)
00848 {
00849 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
00850 recv_error= true;
00851 }
00852
00853 if (poll_fd.revents & POLLNVAL)
00854 {
00855 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
00856 recv_error= true;
00857 }
00858
00859
00860 switch (poll_status)
00861 {
00862 case -1:
00863 if (errno == 0 || errno == EINTR)
00864 {
00865 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
00866 }
00867 else
00868 {
00869 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
00870 recv_error= true;
00871 }
00872
00873 continue;
00874 break;
00875
00876 case 0:
00877 #ifdef DEBUG_HARD
00878 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
00879 #endif
00880 continue;
00881 break;
00882
00883 default:
00884 #ifdef DEBUG_HARD
00885 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
00886 #endif
00887 break;
00888 }
00889
00890
00892 ret = recv(conn_socket,
00893 netmsg->get_buffer() + bytes_received,
00894 buffer_bytes_left,
00895 MSG_DONTWAIT);
00896
00897 if ( ret < 0 )
00898 {
00899 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
00900 {
00901 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
00902 recv_error= true;
00903 continue;
00904 }
00905 else
00906 {
00907
00908 continue;
00909 }
00910 }
00911 else
00912 {
00913 if (ret == 0)
00914 {
00915
00916
00917 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
00918
00919 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
00920 if (myassoc->shutdown == false)
00921 {
00922 myassoc->shutdown= true;
00923 if (shutdown(myassoc->socketfd,SHUT_WR))
00924 {
00925 if ( errno != ENOTCONN )
00926 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
00927 }
00928 }
00929
00930 recv_error= true;
00931 }
00932 else
00933 {
00934
00935 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
00936
00937 bytes_received+= ret;
00938 buffer_bytes_left-= ret;
00939 }
00940 }
00941 }
00942
00943 if (buffer_bytes_left < 0)
00944 {
00945 recv_error= true;
00946 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
00947 }
00948
00949 if (!msgcontentlength_known)
00950 {
00951
00952 if (bytes_received >= common_header_length)
00953 {
00954
00955 if (getmsglength(*netmsg, msgcontentlength))
00956 msgcontentlength_known= true;
00957 else
00958 {
00959 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
00960
00961 ostringstream hexdumpstr;
00962 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
00963 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
00964
00965
00966 msgcontentlength= 0;
00967 msgcontentlength_known= false;
00968 bytes_received= 0;
00969 pdu_complete= false;
00970 continue;
00971 }
00972 }
00973 }
00974
00975
00976 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
00977 if (msgcontentlength_known)
00978 {
00979 if (bytes_received-common_header_length >= msgcontentlength )
00980 {
00981 pdu_complete= true;
00982
00983 netmsg->truncate(common_header_length+msgcontentlength);
00984
00985
00986 if (bytes_received-common_header_length > msgcontentlength)
00987 {
00988 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
00989 remainbuf= new NetMsg(NetMsg::max_size);
00990 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
00991 bytes_received= common_header_length+msgcontentlength;
00992 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
00993 }
00994 }
00995 else
00996 {
00997 skiprecv= false;
00998 }
00999 }
01000 }
01001
01002
01003
01004 if (ret == 0)
01005 {
01006 recv_error= false;
01007 }
01008
01009
01010 if (!recv_error && pdu_complete)
01011 {
01012
01013 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
01014 if (tpmsg)
01015 {
01016 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
01017 << " to module " << message::get_qaddr_name(tpparam.dest));
01018 }
01019
01020 debug_pdu=false;
01021
01022 if (debug_pdu)
01023 {
01024 ostringstream hexdump;
01025 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
01026 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
01027 }
01028
01029
01030
01031 if (!tpmsg
01032 || (!tpmsg->get_peeraddress())
01033 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
01034 {
01035 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
01036 if (tpmsg) delete tpmsg;
01037 }
01038
01039
01040 }
01041 else
01042 {
01043 if (bytes_received>0)
01044 {
01045 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
01046 }
01047
01048 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
01049 {
01050 ostringstream hexdumpstr;
01051 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
01052 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
01053 }
01054
01055
01056 break;
01057
01058 }
01059
01060 }
01061
01062 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
01063 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
01064
01065
01066 if (shutdown(conn_socket, SHUT_RD))
01067 {
01068 if ( errno != ENOTCONN )
01069 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
01070 }
01071
01072
01073 close(conn_socket);
01074
01075 receiver_thread_argp->terminated= true;
01076
01077 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
01078
01079 #ifdef _DEBUG
01080 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
01081 #endif
01082
01083 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
01084
01085 newmsg->send_to(tpparam.source);
01086
01087 }
01088
01089
01095 void
01096 TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
01097 {
01098
01099
01100
01101
01102
01103 if (peer_assoc == 0)
01104 return;
01105
01106 pthread_t thread_id= peer_assoc->thread_ID;
01107
01108
01109 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
01110 receiver_thread_arg_t* recv_thread_argp=
01111 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
01112 if (recv_thread_argp)
01113 {
01114 if (!recv_thread_argp->terminated)
01115 {
01116
01117 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
01118
01119
01120 recv_thread_argp->sig_terminate= true;
01121
01122 pthread_join(thread_id, 0);
01123
01124 return;
01125 }
01126 }
01127 else
01128 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
01129
01130 }
01131
01132
01138 void
01139 TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
01140 {
01141
01142
01143
01144
01145
01146 if (peer_assoc == 0)
01147 return;
01148
01149 pthread_t thread_id= peer_assoc->thread_ID;
01150
01151
01152 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
01153 receiver_thread_arg_t* recv_thread_argp=
01154 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
01155 if (recv_thread_argp)
01156 {
01157 if (!recv_thread_argp->terminated)
01158 {
01159
01160 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
01161 return;
01162 }
01163 else
01164 {
01165 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
01166
01167
01168 recv_thread_argmap.erase(recv_thread_arg_iter);
01169
01170
01171 delete recv_thread_argp;
01172 }
01173 }
01174
01175
01176
01177
01178
01179 terminate_sender_thread(peer_assoc);
01180
01181
01182
01183 connmap.erase(peer_assoc);
01184
01185
01186
01187 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
01188 }
01189
01190
01191
01192
01193
01194
01195 void
01196 TPoverTCP::terminate_sender_thread(const AssocData* assoc)
01197 {
01198 if (assoc == 0)
01199 {
01200 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
01201 return;
01202 }
01203
01204 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
01205
01206 if (it != senderthread_queuemap.end())
01207 {
01208 FastQueue* destqueue= it->second;
01209 if (destqueue)
01210 {
01211 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
01212 if (internalmsg)
01213 {
01214
01215 internalmsg->send(tpparam.source,destqueue);
01216 }
01217 }
01218 else
01219 {
01220 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
01221 }
01222
01223 senderthread_queuemap.erase(it);
01224 }
01225 }
01226
01227
01228
01229
01230
01231 void
01232 TPoverTCP::terminate_all_threads()
01233 {
01234 AssocData* assoc= 0;
01235 receiver_thread_arg_t* terminate_argp;
01236
01237 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
01238 terminate_iterator != recv_thread_argmap.end();
01239 terminate_iterator++)
01240 {
01241 if ( (terminate_argp= terminate_iterator->second) != 0)
01242 {
01243
01244 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
01245
01246 if (terminate_argp->terminated == false)
01247 {
01248 terminate_argp->sig_terminate= true;
01249
01250 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
01251 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
01252
01253 pthread_join(terminate_iterator->first, 0);
01254
01255 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
01256 }
01257 else
01258 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
01259 "Receiver thread <" << terminate_iterator->first << "> already terminated");
01260
01261
01262 delete terminate_argp;
01263
01264
01265 terminate_sender_thread(assoc);
01266
01267 connmap.erase(assoc);
01268
01269 }
01270 }
01271 }
01272
01273
01281 void*
01282 TPoverTCP::sender_thread_starter(void *argp)
01283 {
01284 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
01285
01286
01287
01288
01289 if (sargp != 0 && sargp->instance != 0)
01290 {
01291
01292 sargp->instance->sender_thread(sargp->sender_thread_queue);
01293
01294
01295
01296
01297 delete sargp;
01298 }
01299 else
01300 {
01301 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
01302 }
01303 return 0;
01304 }
01305
01306
01307
01308
01316 void*
01317 TPoverTCP::receiver_thread_starter(void *argp)
01318 {
01319 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
01320
01321 if (rargp != 0 && rargp->instance != 0)
01322 {
01323
01324 rargp->instance->receiver_thread(rargp->rtargp);
01325
01326
01327 delete rargp;
01328 }
01329 else
01330 {
01331 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
01332 }
01333 return 0;
01334 }
01335
01336
01337 void
01338 TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
01339 {
01340 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
01341
01342 pthread_t senderthreadid;
01343
01344 int pthread_status= pthread_create(&senderthreadid,
01345 NULL,
01346
01347 TPoverTCP::sender_thread_starter,
01348 new sender_thread_start_arg_t(this,senderfqueue));
01349 if (pthread_status)
01350 {
01351 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
01352
01353 delete senderfqueue;
01354 }
01355 }
01356
01357
01358 void
01359 TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
01360 {
01361 receiver_thread_arg_t* argp=
01362 new(nothrow) receiver_thread_arg(peer_assoc);
01363
01364 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
01365
01366
01367 int pthread_status= pthread_create(&peer_assoc->thread_ID,
01368 NULL,
01369
01370 receiver_thread_starter,
01371 new(nothrow) receiver_thread_start_arg_t(this,argp));
01372 if (pthread_status)
01373 {
01374 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
01375
01376 delete argp;
01377 }
01378 else
01379 {
01380 lock();
01381
01382
01383
01384 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
01385 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
01386 if (tmpinsiterator.second == false)
01387 {
01388 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
01389 }
01390 unlock();
01391 }
01392 }
01393
01394
01402 void*
01403 TPoverTCP::master_listener_thread_starter(void *argp)
01404 {
01405
01406 if (argp != 0)
01407 {
01408 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
01409 }
01410 return 0;
01411 }
01412
01413
01414
01420 void
01421 TPoverTCP::master_listener_thread()
01422 {
01423
01424 struct sockaddr_in6 own_address;
01425 own_address.sin6_family = AF_INET6;
01426 own_address.sin6_flowinfo= 0;
01427 own_address.sin6_port = htons(tpparam.port);
01428
01429 own_address.sin6_addr = in6addr_any;
01430 own_address.sin6_scope_id= 0;
01431
01432
01433 int master_listener_socket= socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
01434 if (master_listener_socket == -1)
01435 {
01436 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
01437 return;
01438 }
01439
01440
01441 int nodelayflag= 1;
01442 int status= setsockopt(master_listener_socket,
01443 IPPROTO_TCP,
01444 TCP_NODELAY,
01445 &nodelayflag,
01446 sizeof(nodelayflag));
01447 if (status)
01448 {
01449 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
01450 }
01451
01452
01453 int socketreuseflag= 1;
01454 status= setsockopt(master_listener_socket,
01455 SOL_SOCKET,
01456 SO_REUSEADDR,
01457 (const char *) &socketreuseflag,
01458 sizeof(socketreuseflag));
01459 if (status)
01460 {
01461 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
01462 }
01463
01464
01465
01466 int bind_status = bind(master_listener_socket,
01467 reinterpret_cast<struct sockaddr *>(&own_address),
01468 sizeof(own_address));
01469 if (bind_status)
01470 {
01471 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
01472 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN)
01473 << " port " << tpparam.port << " failed, error: " << strerror(errno));
01474 return;
01475 }
01476
01477
01478
01479 int listen_status = listen(master_listener_socket, max_listen_queue_size);
01480 if (listen_status)
01481 {
01482 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
01483 << " failed, error: " << strerror(errno));
01484 return;
01485 }
01486 else
01487 {
01488 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
01489 }
01490
01491
01492 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
01493
01494
01495 struct pollfd poll_fd;
01496 poll_fd.fd = master_listener_socket;
01497 poll_fd.events = POLLIN | POLLPRI;
01498 poll_fd.revents = 0;
01499
01500
01501
01502
01503
01504
01505 bool terminate = false;
01506
01507 state_t currstate= get_state();
01508 int poll_status= 0;
01509 const unsigned int number_poll_sockets= 1;
01510 struct sockaddr_in6 peer_address;
01511 socklen_t peer_address_len;
01512 int conn_socket;
01513
01514
01515 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
01516 {
01517
01518
01519 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
01520 if (poll_fd.revents & POLLERR)
01521 {
01522 if (errno != EINTR)
01523 {
01524 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
01525 "Poll caused error " << strerror(errno) << " - indicated by revents");
01526 }
01527 else
01528 {
01529 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
01530 }
01531
01532 }
01533 if (poll_fd.revents & POLLHUP)
01534 {
01535 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
01536 return;
01537 }
01538 if (poll_fd.revents & POLLNVAL)
01539 {
01540 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
01541 return;
01542 }
01543
01544 switch (poll_status)
01545 {
01546 case -1:
01547 if (errno != EINTR)
01548 {
01549 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
01550 }
01551 else
01552 {
01553 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
01554 }
01555
01556 break;
01557
01558 case 0:
01559 #ifdef DEBUG_HARD
01560 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
01561 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
01562 #endif
01563 currstate= get_state();
01564 continue;
01565 break;
01566
01567 default:
01568 #ifdef DEBUG_HARD
01569 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
01570 #endif
01571 break;
01572 }
01573
01574
01575
01576
01577 peer_address_len= sizeof(peer_address);
01578 conn_socket = accept (master_listener_socket,
01579 reinterpret_cast<struct sockaddr *>(&peer_address),
01580 &peer_address_len);
01581 if (conn_socket == -1)
01582 {
01583 if (errno != EWOULDBLOCK && errno != EAGAIN)
01584 {
01585 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
01586 << " failed, error: " << strerror(errno));
01587 return;
01588 }
01589 }
01590 else
01591 {
01592
01593 AssocData* peer_assoc = NULL;
01594 appladdress addr(peer_address, IPPROTO_TCP);
01595
01596 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
01597 << " port #" << addr.get_port());
01598
01599 struct sockaddr_in6 own_address;
01600 socklen_t own_address_len= sizeof(own_address);
01601 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
01602
01603
01604
01605 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
01606
01607 bool insert_success= false;
01608 if (peer_assoc)
01609 {
01610
01611 lock();
01612 insert_success= connmap.insert(peer_assoc);
01613
01614 unlock();
01615 }
01616
01617
01618 if (insert_success == false)
01619 {
01620 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
01621 << ", " << addr.get_ip_str() << ", port #"
01622 << addr.get_port() << " into connection map, aborting connection...");
01623
01624
01625 close (conn_socket);
01626 if (peer_assoc)
01627 {
01628 delete peer_assoc;
01629 peer_assoc= 0;
01630 }
01631 return;
01632
01633 }
01634
01635
01636 create_new_receiver_thread(peer_assoc);
01637 }
01638
01639
01640 currstate= get_state();
01641
01642 }
01643 return;
01644 }
01645
01646
01647 TPoverTCP::~TPoverTCP()
01648 {
01649 init= false;
01650
01651 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
01652
01653 QueueManager::instance()->unregister_queue(tpparam.source);
01654 }
01655
01666 void
01667 TPoverTCP::main_loop(uint32 nr)
01668 {
01669
01670
01671 FastQueue* fq = get_fqueue();
01672 if (!fq)
01673 {
01674 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
01675 return;
01676 }
01677
01678 QueueManager::instance()->register_queue(fq,tpparam.source);
01679
01680
01681 pthread_t master_listener_thread_ID;
01682 int pthread_status= pthread_create(&master_listener_thread_ID,
01683 NULL,
01684
01685 master_listener_thread_starter,
01686 this);
01687 if (pthread_status)
01688 {
01689 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
01690 "New master listener thread could not be created: " << strerror(pthread_status));
01691 }
01692 else
01693 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
01694
01695
01696
01697 timespec wait_interval= { 0, 250000000L };
01698 message* internal_thread_msg = NULL;
01699 state_t currstate= get_state();
01700
01701
01702 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
01703 {
01704
01705 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
01706 {
01707 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
01708 if (internalmsg)
01709 {
01710 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
01711 {
01712
01713 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
01714 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
01715 lock();
01716 cleanup_receiver_thread( assocd );
01717 unlock();
01718 }
01719 else
01720 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
01721 {
01722
01723 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
01724 }
01725 else
01726 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
01727
01728 delete internalmsg;
01729 }
01730 else
01731 {
01732 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
01733 << internal_thread_msg->get_source());
01734 }
01735 }
01736
01737
01738 currstate= get_state();
01739 }
01740
01741 if (currstate==STATE_STOP)
01742 {
01743
01744 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
01745 }
01746
01747
01748 fq->shutdown();
01749
01750 terminate_all_threads();
01751 }
01752
01753 }