00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 extern "C"
00031 {
00032
00033 #include <unistd.h>
00034 #include <sys/types.h>
00035 #include <netinet/in.h>
00036 #include <netinet/tcp.h>
00037 #include <sys/socket.h>
00038 #include <arpa/inet.h>
00039
00040 #include <fcntl.h>
00041 #include <sys/poll.h>
00042 }
00043
00044 #include <iostream>
00045 #include <errno.h>
00046 #include <string>
00047 #include <sstream>
00048
00049 #include "tp_over_tcp.h"
00050 #include "threadsafe_db.h"
00051 #include "cleanuphandler.h"
00052 #include "setuid.h"
00053 #include "queuemanager.h"
00054 #include "logfile.h"
00055
00056 #include <set>
00057
00058 #define TCP_SUCCESS 0
00059 #define TCP_SEND_FAILURE 1
00060
00061 const unsigned int max_listen_queue_size= 10;
00062
00063 #define IPV6_ADDR_INT32_SMP 0x0000ffff
00064
00065 namespace protlib {
00066
00067 void v6_to_v4(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
00068 bzero(sin, sizeof(*sin));
00069 sin->sin_family = AF_INET;
00070 sin->sin_port = sin6->sin6_port;
00071 memcpy(&sin->sin_addr, &sin6->sin6_addr.s6_addr[12], sizeof(struct in_addr));
00072 }
00073
00074
00075 void v4_to_v6(struct sockaddr_in *sin, struct sockaddr_in6 *sin6) {
00076 bzero(sin6, sizeof(*sin6));
00077 sin6->sin6_family = AF_INET6;
00078 sin6->sin6_port = sin->sin_port;
00079 *(uint32_t *)&sin6->sin6_addr.s6_addr[0] = 0;
00080 *(uint32_t *)&sin6->sin6_addr.s6_addr[4] = 0;
00081 *(uint32_t *)&sin6->sin6_addr.s6_addr[8] = IPV6_ADDR_INT32_SMP;
00082 *(uint32_t *)&sin6->sin6_addr.s6_addr[12] = sin->sin_addr.s_addr;
00083 }
00084
00085 using namespace log;
00086
00092 char in6_addrstr[INET6_ADDRSTRLEN+1];
00093
00094
00095
00096
00097
00104 AssocData*
00105 TPoverTCP::get_connection_to(const appladdress& addr)
00106 {
00107
00108 struct timespec ts;
00109 get_time_of_day(ts);
00110 ts.tv_nsec+= tpparam.sleep_time * 1000000L;
00111 if (ts.tv_nsec>=1000000000L)
00112 {
00113 ts.tv_sec += ts.tv_nsec / 1000000000L;
00114 ts.tv_nsec= ts.tv_nsec % 1000000000L;
00115 }
00116
00117
00118 AssocData* assoc= NULL;
00119 int new_socket;
00120
00121 do
00122 {
00123
00124
00125 lock();
00126 assoc= connmap.lookup(addr);
00127
00128 unlock();
00129 if (assoc)
00130 {
00131
00132 if (!assoc->shutdown)
00133 {
00134 return assoc;
00135 }
00136 else
00137 {
00138
00139 ERRCLog(tpparam.name,"socket exists, but is already in mode shutdown");
00140
00141 return 0;
00142 }
00143 }
00144 else
00145 {
00146 Log(DEBUG_LOG,LOG_UNIMP,tpparam.name,"No existing connection to "
00147 << addr.get_ip_str() << " port #" << addr.get_port() << " found, creating a new one.");
00148 }
00149
00150
00151 new_socket = socket( v4_mode ? AF_INET : AF_INET6, SOCK_STREAM, IPPROTO_TCP);
00152 if (new_socket == -1)
00153 {
00154 ERRCLog(tpparam.name,"Couldn't create a new socket: " << strerror(errno));
00155
00156 return 0;
00157 }
00158
00159
00160 int nodelayflag= 1;
00161 int status= setsockopt(new_socket,
00162 IPPROTO_TCP,
00163 TCP_NODELAY,
00164 &nodelayflag,
00165 sizeof(nodelayflag));
00166 if (status)
00167 {
00168 ERRLog(tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
00169 }
00170
00171
00172 int socketreuseflag= 1;
00173 status= setsockopt(new_socket,
00174 SOL_SOCKET,
00175 SO_REUSEADDR,
00176 (const char *) &socketreuseflag,
00177 sizeof(socketreuseflag));
00178 if (status)
00179 {
00180 ERRLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
00181 }
00182
00183 struct sockaddr_in6 dest_address;
00184 dest_address.sin6_flowinfo= 0;
00185 dest_address.sin6_scope_id= 0;
00186 addr.get_sockaddr(dest_address);
00187
00188
00189 int connect_status = 0;
00190 if (v4_mode) {
00191 struct sockaddr_in dest_address_v4;
00192 v6_to_v4( &dest_address_v4, &dest_address );
00193 connect_status = connect(new_socket,
00194 reinterpret_cast<const struct sockaddr*>(&dest_address_v4),
00195 sizeof(dest_address));
00196 } else {
00197 connect_status = connect(new_socket,
00198 reinterpret_cast<const struct sockaddr*>(&dest_address),
00199 sizeof(dest_address));
00200 }
00201
00202
00203 if (connect_status != 0)
00204 {
00205 ERRLog(tpparam.name,"Connect to " << addr.get_ip_str() << " port #" << addr.get_port()
00206 << " failed: [" << color[red] << strerror(errno) << color[off] << "]");
00207
00208 return 0;
00209 }
00210
00211
00212 struct sockaddr_in6 own_address;
00213 if (v4_mode) {
00214 struct sockaddr_in own_address_v4;
00215 socklen_t own_address_len_v4 = sizeof(own_address_v4);
00216 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
00217 v4_to_v6(&own_address_v4, &own_address);
00218 } else {
00219 socklen_t own_address_len= sizeof(own_address);
00220 getsockname(new_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
00221 }
00222
00223 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,">>--Connect-->> to " << addr.get_ip_str() << " port #" << addr.get_port()
00224 << " from " << inet_ntop(AF_INET6,&own_address.sin6_addr,in6_addrstr,INET6_ADDRSTRLEN)
00225 << " port #" << ntohs(own_address.sin6_port));
00226
00227
00228
00229 assoc = new(nothrow) AssocData(new_socket, addr, appladdress(own_address,IPPROTO_TCP));
00230
00231
00232 if (assoc)
00233 {
00234 bool insert_success= false;
00235
00236 lock();
00237
00238 insert_success= connmap.insert(assoc);
00239
00240 unlock();
00241
00242 if (insert_success == true)
00243 {
00244 #ifdef _DEBUG
00245 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Connected to " << addr.get_ip_str() << ", port #" << addr.get_port()
00246 << " via socket " << new_socket);
00247
00248
00249 #endif
00250
00251
00252 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(assoc, tpparam.source, TPoverTCPMsg::start);
00253 if (newmsg)
00254 {
00255 bool ret = newmsg->send_to(tpparam.source);
00256 if(!ret) delete newmsg;
00257 return assoc;
00258 }
00259 else
00260 ERRCLog(tpparam.name,"get_connection_to: could not get memory for internal msg");
00261 }
00262 else
00263 {
00264
00265 ERRCLog(tpparam.name, "Cannot insert AssocData for socket " << new_socket << ", "<< addr.get_ip_str()
00266 <<", port #" << addr.get_port() << " into connection map, aborting connection");
00267
00268
00269 close (new_socket);
00270 if (assoc)
00271 {
00272 delete assoc;
00273 assoc= 0;
00274 }
00275 return assoc;
00276 }
00277
00278 }
00279 }
00280 while (wait_cond(ts)!=ETIMEDOUT);
00281
00282 return assoc;
00283 }
00284
00285
00290 void
00291 TPoverTCP::terminate(const address& in_addr)
00292 {
00293 #ifndef _NO_LOGGING
00294 const char *const thisproc="terminate() - ";
00295 #endif
00296
00297 appladdress* addr = NULL;
00298 addr = dynamic_cast<appladdress*>(in_addr.copy());
00299
00300 if (!addr) return;
00301
00302
00303 AssocData* assoc = NULL;
00304
00305
00306
00307
00308
00309
00310
00311 lock();
00312 assoc= connmap.lookup(*addr);
00313 if (assoc)
00314 {
00315 EVLog(tpparam.name,thisproc<<"got request to shutdown connection for peer " << addr);
00316
00317 if (!assoc->shutdown)
00318 {
00319 if (assoc->socketfd)
00320 {
00321
00322 if (shutdown(assoc->socketfd,SHUT_WR))
00323 {
00324 ERRLog(tpparam.name,thisproc<<"shutdown (write) on socket for peer " << addr << " returned error:" << strerror(errno));
00325 }
00326 else
00327 {
00328 EVLog(tpparam.name,thisproc<<"initiated closing of connection for peer " << addr << ". Shutdown (write) on socket "<< assoc->socketfd );
00329 }
00330 }
00331 assoc->shutdown= true;
00332 }
00333 else
00334 EVLog(tpparam.name,thisproc<<"connection for peer " << addr << " is already in mode shutdown");
00335
00336 }
00337 else
00338 WLog(tpparam.name,thisproc<<"could not find a connection for peer " << *addr);
00339
00340 stop_receiver_thread(assoc);
00341
00342
00343 unlock();
00344
00345 if (addr) delete addr;
00346 }
00347
00348
00356 void
00357 TPoverTCP::send(NetMsg* netmsg, const address& in_addr, bool use_existing_connection)
00358 {
00359 if (netmsg == NULL) {
00360 ERRCLog(tpparam.name,"send() - called without valid NetMsg buffer (NULL)");
00361 return;
00362 }
00363
00364 appladdress* addr = NULL;
00365 addr= dynamic_cast<appladdress*>(in_addr.copy());
00366
00367 if (!addr)
00368 {
00369 ERRCLog(tpparam.name,"send() - given destination address is not of expected type (appladdress), has type " << (int) in_addr.get_type());
00370 return;
00371 }
00372
00373
00374 lock();
00375
00376
00377 sender_thread_queuemap_t::const_iterator it= senderthread_queuemap.find(*addr);
00378
00379 FastQueue* destqueue= 0;
00380
00381 if (it == senderthread_queuemap.end())
00382 {
00383
00384
00385
00386
00387 const AssocData* assoc = connmap.lookup(*addr);
00388
00389
00390
00391 if (use_existing_connection==false || (assoc && assoc->shutdown==false && assoc->socketfd>0))
00392 {
00393
00394 FastQueue* sender_thread_queue= new FastQueue;
00395 create_new_sender_thread(sender_thread_queue);
00396
00397
00398
00399 senderthread_queuemap.insert( pair<appladdress,FastQueue*> (*addr,sender_thread_queue) );
00400
00401 destqueue= sender_thread_queue;
00402 }
00403 }
00404 else
00405 {
00406 destqueue= it->second;
00407 }
00408
00409 unlock();
00410
00411
00412 if (destqueue)
00413 {
00414
00415
00416 appladdress* apl=new appladdress(*addr);
00417 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(netmsg,apl);
00418 if (internalmsg)
00419 {
00420
00421 bool sent = internalmsg->send(tpparam.source,destqueue);
00422 if (!sent) {
00423 delete internalmsg->get_appladdr();
00424 delete internalmsg;
00425 internalmsg = NULL;
00426 }
00427 }
00428 }
00429 else
00430 {
00431 if (!use_existing_connection)
00432 WLog(tpparam.name,"send() - found entry for address, but no active sender thread available for peer addr:" << *addr << " - dropping data");
00433 else
00434 {
00435 DLog(tpparam.name,"no active sender thread found for peer " << *addr << " - but policy forbids to set up a new connection, will drop data");
00436
00437 }
00438
00439 delete netmsg;
00440 }
00441
00442 if (addr) delete addr;
00443 }
00444
00453 void
00454 TPoverTCP::tcpsend(NetMsg* netmsg, appladdress* addr)
00455 {
00456 #ifndef _NO_LOGGING
00457 const char *const thisproc="sender - ";
00458 #endif
00459
00460
00461
00462 int result = TCP_SUCCESS;
00463 int saved_errno= 0;
00464 int ret= 0;
00465
00466
00467 AssocData* assoc = NULL;
00468
00469
00470
00471
00472 if (addr) {
00473 addr->convert_to_ipv6();
00474 check_send_args(*netmsg,*addr);
00475 }
00476 else
00477 {
00478 ERRCLog(tpparam.name, thisproc << "address pointer is NULL");
00479 result= TCP_SEND_FAILURE;
00480
00481 delete netmsg;
00482 delete addr;
00483
00484 throw TPErrorInternal();
00485 }
00486
00487
00488
00489
00490
00491
00492
00493
00494 assoc= get_connection_to(*addr);
00495
00496 if (assoc==NULL || assoc->socketfd<=0)
00497 {
00498 ERRCLog(tpparam.name, color[red] << thisproc << "no valid assoc/socket data - dropping packet");
00499
00500 delete netmsg;
00501 delete addr;
00502 return;
00503 }
00504
00505 if (assoc->shutdown)
00506 {
00507 Log(WARNING_LOG, LOG_ALERT, tpparam.name, thisproc << "should send message although connection already half closed");
00508 delete netmsg;
00509 delete addr;
00510
00511 throw TPErrorSendFailed();
00512 }
00513
00514 uint32 msgsize= netmsg->get_size();
00515 #ifdef DEBUG_HARD
00516 cerr << thisproc << "message size=" << netmsg->get_size() << endl;
00517 #endif
00518
00519 const uint32 retry_send_max = 3;
00520 uint32 retry_count = 0;
00521
00522
00523 for (uint32 bytes_sent= 0;
00524 bytes_sent < msgsize;
00525 bytes_sent+= ret)
00526 {
00527
00528 #ifdef _DEBUG_HARD
00529 for (uint32 i=0;i<msgsize;i++)
00530 {
00531 cout << "send_buf: " << i << " : ";
00532 if ( isalnum(*(netmsg->get_buffer()+i)) )
00533 cout << "'" << *(netmsg->get_buffer()+i) << "' (0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec << ")" ;
00534 else
00535 cout << "0x" << hex << (unsigned short) *(netmsg->get_buffer()+i) << dec;
00536 cout << endl;
00537 }
00538
00539 cout << endl;
00540 cout << "bytes_sent: " << bytes_sent << endl;
00541 cout << "Message size: " << msgsize << endl;
00542 cout << "Send-Socket: " << assoc->socketfd << endl;
00543 cout << "pointer-Offset. " << netmsg->get_pos() << endl;
00544 cout << "vor send " << endl;
00545 #endif
00546
00547 retry_count= 0;
00548 do
00549 {
00550
00551 ret= ::send(assoc->socketfd,
00552 netmsg->get_buffer() + bytes_sent,
00553 msgsize - bytes_sent,
00554 MSG_NOSIGNAL);
00555
00556
00557
00558
00559
00560 if (ret < 0)
00561 {
00562 saved_errno= errno;
00563 switch(saved_errno)
00564 {
00565 case EAGAIN:
00566 case EINTR:
00567 case ENOBUFS:
00568 retry_count++;
00569 ERRLog(tpparam.name,"Temporary failure while calling send(): " << strerror(saved_errno) << ", errno: " << saved_errno
00570 << " - retry sending, retry #" << retry_count);
00571
00572 sleep(1);
00573 break;
00574
00575
00576 default:
00577 retry_count= retry_send_max;
00578 break;
00579 }
00580 }
00581 else
00582 break;
00583 }
00584 while(retry_count < retry_send_max);
00585
00586 if (ret < 0)
00587 {
00588
00589 result= TCP_SEND_FAILURE;
00590 break;
00591 }
00592 else
00593 {
00594 if (debug_pdu)
00595 {
00596 ostringstream hexdump;
00597 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_sent);
00598 DLog(tpparam.name,"PDU debugging enabled - Sent:" << hexdump.str());
00599 }
00600 }
00601
00602
00603 }
00604
00605
00606 delete netmsg;
00607
00608
00609
00610
00611 if (result != TCP_SUCCESS)
00612 {
00613 ERRLog(tpparam.name, thisproc << "TCP error, returns " << ret << ", error : " << strerror(errno));
00614 delete addr;
00615
00616 throw TPErrorSendFailed(saved_errno);
00617
00618 }
00619 else
00620 EVLog(tpparam.name, thisproc << ">>----Sent---->> message (" << msgsize << " bytes) using socket " << assoc->socketfd << " to " << *addr);
00621
00622 if (!assoc) {
00623
00624
00625 ERRLog(tpparam.name, thisproc << "cannot get connection to " << addr->get_ip_str()
00626 << ", port #" << addr->get_port());
00627
00628 delete addr;
00629
00630 throw TPErrorUnreachable();
00631 }
00632
00633
00634 delete addr;
00635 }
00636
00637
00638
00639
00640
00641
00642 void
00643 TPoverTCP::sender_thread(void *argp)
00644 {
00645 #ifndef _NO_LOGGING
00646 const char *const methodname="senderthread - ";
00647 #endif
00648
00649 message* internal_thread_msg = NULL;
00650
00651 EVLog(tpparam.name, methodname << "starting as thread <" << pthread_self() << ">");
00652
00653 FastQueue* fq= reinterpret_cast<FastQueue*>(argp);
00654 if (!fq)
00655 {
00656 ERRLog(tpparam.name, methodname << "thread <" << pthread_self() << "> no valid pointer to msg queue. Stop.");
00657 return;
00658 }
00659
00660 bool terminate= false;
00661 TPoverTCPMsg* internalmsg= 0;
00662 while (terminate==false && (internal_thread_msg= fq->dequeue()) != 0 )
00663 {
00664 internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
00665
00666 if (internalmsg == 0)
00667 {
00668 ERRLog(tpparam.name, methodname << "received not an TPoverTCPMsg but a" << internal_thread_msg->get_type_name());
00669 }
00670 else
00671 if (internalmsg->get_msgtype() == TPoverTCPMsg::send_data)
00672 {
00673
00674 if (internalmsg->get_netmsg() && internalmsg->get_appladdr())
00675 {
00676 try
00677 {
00678 tcpsend(internalmsg->get_netmsg(),internalmsg->get_appladdr());
00679 }
00680 catch(TPErrorSendFailed& err)
00681 {
00682 ERRLog(tpparam.name, methodname << "TCP send call failed - " << err.what()
00683 << " cause: (" << err.get_reason() << ") " << strerror(err.get_reason()) );
00684 }
00685 catch(TPError& err)
00686 {
00687 ERRLog(tpparam.name, methodname << "TCP send call failed - reason: " << err.what());
00688 }
00689 catch(...)
00690 {
00691 ERRLog(tpparam.name, methodname << "TCP send call failed - unknown exception");
00692 }
00693 }
00694 else
00695 {
00696 ERRLog(tpparam.name, methodname << "problem with passed arguments references, they point to 0");
00697 }
00698 }
00699 else
00700 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
00701 {
00702 terminate= true;
00703 }
00704
00705 delete internalmsg;
00706 }
00707
00708 EVLog(tpparam.name, methodname << "<" << pthread_self() << "> terminated connection.");
00709 }
00710
00711
00719 void
00720 TPoverTCP::receiver_thread(void *argp)
00721 {
00722 #ifndef _NO_LOGGING
00723 const char *const methodname="receiver - ";
00724 #endif
00725
00726 receiver_thread_arg_t *receiver_thread_argp= static_cast<receiver_thread_arg_t *>(argp);
00727 const appladdress* peer_addr = NULL;
00728 const appladdress* own_addr = NULL;
00729 uint32 bytes_received = 0;
00730 TPMsg* tpmsg= NULL;
00731
00732
00733 if (receiver_thread_argp == 0)
00734 {
00735 ERRCLog(tpparam.name, methodname << "No arguments given at start of receiver thread <" << pthread_self() << ">, exiting.");
00736
00737 return;
00738 }
00739 else
00740 {
00741
00742 receiver_thread_argp->terminated= false;
00743
00744 #ifdef _DEBUG
00745 DLog(tpparam.name, methodname << "New receiver thread <" << pthread_self() << "> started. ");
00746 #endif
00747 }
00748
00749 int conn_socket= 0;
00750 if (receiver_thread_argp->peer_assoc)
00751 {
00752
00753 conn_socket = receiver_thread_argp->peer_assoc->socketfd;
00754
00755 peer_addr= &receiver_thread_argp->peer_assoc->peer;
00756 own_addr= &receiver_thread_argp->peer_assoc->ownaddr;
00757 }
00758 else
00759 {
00760 ERRCLog(tpparam.name, methodname << "No peer assoc available - pointer is NULL");
00761
00762 return;
00763 }
00764
00765 if (peer_addr == 0)
00766 {
00767 ERRCLog(tpparam.name, methodname << "No peer address available for socket " << conn_socket << ", exiting.");
00768
00769 return;
00770 }
00771
00772 #ifdef _DEBUG
00773 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname <<
00774 "Preparing to wait for data at socket "
00775 << conn_socket << " from " << receiver_thread_argp->peer_assoc->peer);
00776 #endif
00777
00778 int ret= 0;
00779 uint32 msgcontentlength= 0;
00780 bool msgcontentlength_known= false;
00781 bool pdu_complete= false;
00782
00783
00785
00786
00787
00788
00789
00790
00791
00792
00793 const unsigned int number_poll_sockets= 1;
00794 struct pollfd poll_fd;
00795
00796 poll_fd.fd = conn_socket;
00797 poll_fd.events = POLLIN | POLLPRI;
00798 poll_fd.revents = 0;
00799
00800 int poll_status;
00801 bool recv_error= false;
00802
00803 NetMsg* netmsg= 0;
00804 NetMsg* remainbuf= 0;
00805 size_t buffer_bytes_left= 0;
00806 size_t trailingbytes= 0;
00807 bool skiprecv= false;
00808
00809
00810 while( receiver_thread_argp->sig_terminate == false )
00811 {
00812
00813 ret= 0;
00814 msgcontentlength= 0;
00815 msgcontentlength_known= false;
00816 pdu_complete= false;
00817 netmsg= 0;
00818
00819
00820 if (remainbuf != 0)
00821 {
00822 netmsg= remainbuf;
00823 remainbuf= 0;
00824 buffer_bytes_left= netmsg->get_size()-trailingbytes;
00825 bytes_received= trailingbytes;
00826 trailingbytes= 0;
00827 skiprecv= true;
00828 }
00829 else
00830 if ( (netmsg= new NetMsg(NetMsg::max_size)) != 0 )
00831 {
00832 buffer_bytes_left= netmsg->get_size();
00833 bytes_received= 0;
00834 skiprecv= false;
00835 }
00836 else
00837 {
00838 bytes_received= 0;
00839 buffer_bytes_left= 0;
00840 recv_error= true;
00841 }
00842
00843
00844
00845 while (!pdu_complete &&
00846 !recv_error &&
00847 !receiver_thread_argp->sig_terminate)
00848 {
00849 if (!skiprecv)
00850 {
00851
00852 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
00853
00854 if (receiver_thread_argp->sig_terminate)
00855 {
00856 Log(EVENT_LOG,LOG_UNIMP,tpparam.name,methodname << "Thread <" << pthread_self() << "> found terminate signal after poll");
00857
00858 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
00859 if (myassoc->shutdown == false)
00860 {
00861 myassoc->shutdown= true;
00862 if (shutdown(myassoc->socketfd,SHUT_WR))
00863 {
00864 if ( errno != ENOTCONN )
00865 Log(ERROR_LOG,LOG_UNIMP,tpparam.name,methodname <<"shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
00866 }
00867 }
00868
00869 if (poll_status == 0)
00870 {
00871 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
00872 }
00873 }
00874
00875 if (poll_fd.revents & POLLERR)
00876 {
00877 if (errno == 0 || errno == EINTR)
00878 {
00879 EVLog(tpparam.name, methodname << "poll(): " << strerror(errno));
00880 }
00881 else
00882 {
00883 ERRCLog(tpparam.name, methodname << "Poll indicates error: " << strerror(errno));
00884 recv_error= true;
00885 }
00886 }
00887
00888 if (poll_fd.revents & POLLHUP)
00889 {
00890 Log(EVENT_LOG,LOG_CRIT, tpparam.name, methodname << "Poll hung up");
00891 recv_error= true;
00892 }
00893
00894 if (poll_fd.revents & POLLNVAL)
00895 {
00896 EVLog(tpparam.name, methodname << "Poll Invalid request: fd not open");
00897 recv_error= true;
00898 }
00899
00900
00901 switch (poll_status)
00902 {
00903 case -1:
00904 if (errno == 0 || errno == EINTR)
00905 {
00906 EVLog(tpparam.name, methodname << "Poll status: " << strerror(errno));
00907 }
00908 else
00909 {
00910 ERRCLog(tpparam.name, methodname << "Poll status indicates error: " << strerror(errno) << "- aborting");
00911 recv_error= true;
00912 }
00913
00914 continue;
00915 break;
00916
00917 case 0:
00918 #ifdef DEBUG_HARD
00919 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll timed out after " << tpparam.sleep_time << " ms.");
00920 #endif
00921 continue;
00922 break;
00923
00924 default:
00925 #ifdef DEBUG_HARD
00926 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
00927 #endif
00928 break;
00929 }
00930
00931
00933 ret = recv(conn_socket,
00934 netmsg->get_buffer() + bytes_received,
00935 buffer_bytes_left,
00936 MSG_DONTWAIT);
00937
00938 if ( ret < 0 )
00939 {
00940 delete netmsg;
00941 if (errno!=EAGAIN && errno!=EWOULDBLOCK)
00942 {
00943 ERRCLog(tpparam.name, methodname << "Receive at socket " << conn_socket << " failed, error: " << strerror(errno));
00944 recv_error= true;
00945 continue;
00946 }
00947 else
00948 {
00949
00950 continue;
00951 }
00952 }
00953 else
00954 {
00955 if (ret == 0)
00956 {
00957
00958
00959 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, methodname << "Other side (" << *peer_addr << ") closed connection for socket " << conn_socket);
00960
00961 AssocData* myassoc=const_cast<AssocData *>(receiver_thread_argp->peer_assoc);
00962 if (myassoc->shutdown == false)
00963 {
00964 myassoc->shutdown= true;
00965 if (shutdown(myassoc->socketfd,SHUT_WR))
00966 {
00967 if ( errno != ENOTCONN )
00968 Log(ERROR_LOG,LOG_UNIMP,tpparam.name, methodname << "shutdown (write) on socket " << conn_socket << " returned error:" << strerror(errno));
00969 }
00970 }
00971
00972 recv_error= true;
00973 }
00974 else
00975 {
00976
00977 Log(EVENT_LOG,LOG_UNIMP, tpparam.name, methodname << "<<--Received--<< packet (" << ret << " bytes) at socket " << conn_socket << " from " << *peer_addr);
00978
00979 bytes_received+= ret;
00980 buffer_bytes_left-= ret;
00981 }
00982 }
00983 }
00984
00985 if (buffer_bytes_left < 0)
00986 {
00987 recv_error= true;
00988 Log(ERROR_LOG,LOG_CRIT, tpparam.name, methodname << "during receive buffer space exhausted");
00989 }
00990
00991 if (!msgcontentlength_known)
00992 {
00993
00994 if (bytes_received >= common_header_length)
00995 {
00996
00997 if (getmsglength(*netmsg, msgcontentlength))
00998 msgcontentlength_known= true;
00999 else
01000 {
01001 ERRCLog(tpparam.name, methodname << "Not a valid protocol header - discarding received packet. received size " << msgcontentlength);
01002
01003 ostringstream hexdumpstr;
01004 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
01005 DLog(tpparam.name,"dumping received bytes:" << hexdumpstr.str());
01006
01007
01008 msgcontentlength= 0;
01009 msgcontentlength_known= false;
01010 bytes_received= 0;
01011 pdu_complete= false;
01012 continue;
01013 }
01014 }
01015 }
01016
01017
01018 DLog(tpparam.name, "bytes_received-common_header_length=" << bytes_received-common_header_length << " msgcontentlength: " << msgcontentlength);
01019 if (msgcontentlength_known)
01020 {
01021 if (bytes_received-common_header_length >= msgcontentlength )
01022 {
01023 pdu_complete= true;
01024
01025 netmsg->truncate(common_header_length+msgcontentlength);
01026
01027
01028 if (bytes_received-common_header_length > msgcontentlength)
01029 {
01030 WLog(tpparam.name,"trailing bytes - received more bytes ("<<bytes_received<<") than expected for PDU (" << common_header_length+msgcontentlength << ")");
01031 remainbuf= new NetMsg(NetMsg::max_size);
01032 trailingbytes= (bytes_received-common_header_length) - msgcontentlength;
01033 bytes_received= common_header_length+msgcontentlength;
01034 memcpy(remainbuf->get_buffer(),netmsg->get_buffer()+common_header_length+msgcontentlength, trailingbytes);
01035 }
01036 }
01037 else
01038 {
01039 skiprecv= false;
01040 }
01041 }
01042 }
01043
01044
01045
01046 if (ret == 0)
01047 {
01048 recv_error= false;
01049 }
01050
01051
01052 if (!recv_error && pdu_complete)
01053 {
01054
01055 tpmsg = new(nothrow) TPMsg(netmsg, peer_addr->copy(), own_addr->copy());
01056 if (tpmsg)
01057 {
01058 DLog(tpparam.name, methodname << "receipt of PDU now complete, sending msg#" << tpmsg->get_id()
01059 << " to module " << message::get_qaddr_name(tpparam.dest));
01060 }
01061
01062 debug_pdu=false;
01063
01064 if (debug_pdu)
01065 {
01066 ostringstream hexdump;
01067 netmsg->hexdump(hexdump,netmsg->get_buffer(),bytes_received);
01068 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"PDU debugging enabled - Received:" << hexdump.str());
01069 }
01070
01071
01072
01073 if (!tpmsg
01074 || (!tpmsg->get_peeraddress())
01075 || (!tpmsg->send(message::qaddr_tp_over_tcp, tpparam.dest)))
01076 {
01077 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Cannot allocate/send TPMsg");
01078 if (tpmsg) delete tpmsg;
01079 }
01080
01081
01082 }
01083 else
01084 {
01085 if (bytes_received>0)
01086 {
01087 Log(WARNING_LOG,LOG_NORMAL, tpparam.name, methodname << "Attention! " << (recv_error? "Receive error, " : "") << (pdu_complete ? "PDU complete" : "PDU incomplete") << "received bytes: " << bytes_received);
01088 }
01089
01090 if (!pdu_complete && bytes_received>0 && bytes_received<common_header_length)
01091 {
01092 ostringstream hexdumpstr;
01093 netmsg->hexdump(hexdumpstr,netmsg->get_buffer(),bytes_received);
01094 Log(DEBUG_LOG,LOG_NORMAL,tpparam.name,"Message too short to be a valid protocol header - dumping received bytes:" << hexdumpstr.str());
01095 }
01096
01097
01098 break;
01099
01100 }
01101
01102 }
01103
01104 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self()
01105 << "> shutting down and closing socket " << receiver_thread_argp->peer_assoc->peer);
01106
01107
01108 if (shutdown(conn_socket, SHUT_RD))
01109 {
01110 if ( errno != ENOTCONN )
01111 Log(ERROR_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> shutdown (read) on socket failed, reason: " << strerror(errno));
01112 }
01113
01114
01115 close(conn_socket);
01116
01117 receiver_thread_argp->terminated= true;
01118
01119
01120 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Thread <" << pthread_self() << "> terminated");
01121
01122 #ifdef _DEBUG
01123 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, methodname << "Signaling main loop for cleanup");
01124 #endif
01125
01126 TPoverTCPMsg* newmsg= new(nothrow)TPoverTCPMsg(receiver_thread_argp->peer_assoc);
01127
01128 newmsg->send_to(tpparam.source);
01129
01130 }
01131
01132
01138 void
01139 TPoverTCP::stop_receiver_thread(AssocData* peer_assoc)
01140 {
01141
01142
01143
01144
01145
01146 if (peer_assoc == 0)
01147 return;
01148
01149 pthread_t thread_id= peer_assoc->thread_ID;
01150
01151
01152 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
01153 receiver_thread_arg_t* recv_thread_argp=
01154 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
01155 if (recv_thread_argp)
01156 {
01157 if (!recv_thread_argp->terminated)
01158 {
01159
01160 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> signaled for termination");
01161
01162
01163 recv_thread_argp->sig_terminate= true;
01164
01165 pthread_join(thread_id, 0);
01166
01167 return;
01168 }
01169 }
01170 else
01171 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"stop_receiver_thread() - Receiver thread <" << thread_id << "> not found");
01172
01173 }
01174
01175
01181 void
01182 TPoverTCP::cleanup_receiver_thread(AssocData* peer_assoc)
01183 {
01184
01185
01186
01187
01188
01189 if (peer_assoc == 0)
01190 return;
01191
01192 pthread_t thread_id= peer_assoc->thread_ID;
01193
01194
01195 recv_thread_argmap_t::iterator recv_thread_arg_iter= recv_thread_argmap.find(thread_id);
01196 receiver_thread_arg_t* recv_thread_argp=
01197 (recv_thread_arg_iter != recv_thread_argmap.end()) ? recv_thread_arg_iter->second : 0;
01198 if (recv_thread_argp)
01199 {
01200 if (!recv_thread_argp->terminated)
01201 {
01202
01203 Log(ERROR_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> not terminated yet?!");
01204 return;
01205 }
01206 else
01207 {
01208 Log(EVENT_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Receiver thread <" << thread_id << "> is terminated");
01209
01210
01211 recv_thread_argmap.erase(recv_thread_arg_iter);
01212
01213
01214 delete recv_thread_argp;
01215 }
01216 }
01217
01218
01219
01220
01221
01222 terminate_sender_thread(peer_assoc);
01223
01224
01225
01226 connmap.erase(peer_assoc);
01227
01228
01229
01230 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,"cleanup_receiver_thread() - Cleanup receiver thread <" << thread_id << ">. Done.");
01231 }
01232
01233
01234
01235
01236
01237
01238 void
01239 TPoverTCP::terminate_sender_thread(const AssocData* assoc)
01240 {
01241 if (assoc == 0)
01242 {
01243 Log(ERROR_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - assoc data == NULL");
01244 return;
01245 }
01246
01247 sender_thread_queuemap_t::iterator it= senderthread_queuemap.find(assoc->peer);
01248
01249 if (it != senderthread_queuemap.end())
01250 {
01251 FastQueue* destqueue= it->second;
01252 if (destqueue)
01253 {
01254 TPoverTCPMsg* internalmsg= new TPoverTCPMsg(assoc,tpparam.source,TPoverTCPMsg::stop);
01255 if (internalmsg)
01256 {
01257
01258 internalmsg->send(tpparam.source,destqueue);
01259 }
01260 }
01261 else
01262 {
01263 Log(WARNING_LOG,LOG_NORMAL,tpparam.name,"terminate_sender_thread() - found entry for address, but no sender thread. addr:" << assoc->peer);
01264 }
01265
01266 senderthread_queuemap.erase(it);
01267 }
01268 }
01269
01270
01271
01272
01273
01274 void
01275 TPoverTCP::terminate_all_threads()
01276 {
01277 AssocData* assoc= 0;
01278 receiver_thread_arg_t* terminate_argp;
01279
01280 for (recv_thread_argmap_t::iterator terminate_iterator= recv_thread_argmap.begin();
01281 terminate_iterator != recv_thread_argmap.end();
01282 terminate_iterator++)
01283 {
01284 if ( (terminate_argp= terminate_iterator->second) != 0)
01285 {
01286
01287 assoc= const_cast<AssocData*>(terminate_argp->peer_assoc);
01288
01289 if (terminate_argp->terminated == false)
01290 {
01291 terminate_argp->sig_terminate= true;
01292
01293 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
01294 "Signaled receiver thread <" << terminate_iterator->first << "> for termination");
01295
01296 pthread_join(terminate_iterator->first, 0);
01297
01298 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Thread <" << terminate_iterator->first << "> is terminated");
01299 }
01300 else
01301 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name,
01302 "Receiver thread <" << terminate_iterator->first << "> already terminated");
01303
01304
01305 delete terminate_argp;
01306
01307
01308 terminate_sender_thread(assoc);
01309
01310 connmap.erase(assoc);
01311
01312 }
01313 }
01314 }
01315
01316
01324 void*
01325 TPoverTCP::sender_thread_starter(void *argp)
01326 {
01327 sender_thread_start_arg_t *sargp= static_cast<sender_thread_start_arg_t *>(argp);
01328
01329
01330
01331
01332 if (sargp != 0 && sargp->instance != 0)
01333 {
01334
01335 sargp->instance->sender_thread(sargp->sender_thread_queue);
01336
01337
01338
01339
01340 delete sargp;
01341 }
01342 else
01343 {
01344 Log(ERROR_LOG,LOG_CRIT,"sender_thread_starter","while starting sender_thread: 0 pointer to arg or object");
01345 }
01346 return 0;
01347 }
01348
01349
01350
01351
01359 void*
01360 TPoverTCP::receiver_thread_starter(void *argp)
01361 {
01362 receiver_thread_start_arg_t *rargp= static_cast<receiver_thread_start_arg_t *>(argp);
01363
01364 if (rargp != 0 && rargp->instance != 0)
01365 {
01366
01367 rargp->instance->receiver_thread(rargp->rtargp);
01368
01369
01370 delete rargp;
01371 }
01372 else
01373 {
01374 Log(ERROR_LOG,LOG_CRIT,"receiver_thread_starter","while starting receiver_thread: 0 pointer to arg or object");
01375 }
01376 return 0;
01377 }
01378
01379
01380 void
01381 TPoverTCP::create_new_sender_thread(FastQueue* senderfqueue)
01382 {
01383 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new sender thread...");
01384
01385 pthread_t senderthreadid;
01386
01387 int pthread_status= pthread_create(&senderthreadid,
01388 NULL,
01389
01390 TPoverTCP::sender_thread_starter,
01391 new sender_thread_start_arg_t(this,senderfqueue));
01392 if (pthread_status)
01393 {
01394 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
01395
01396 delete senderfqueue;
01397 }
01398 }
01399
01400
01401 void
01402 TPoverTCP::create_new_receiver_thread(AssocData* peer_assoc)
01403 {
01404 receiver_thread_arg_t* argp=
01405 new(nothrow) receiver_thread_arg(peer_assoc);
01406
01407 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Starting new receiver thread...");
01408
01409
01410 int pthread_status= pthread_create(&peer_assoc->thread_ID,
01411 NULL,
01412
01413 receiver_thread_starter,
01414 new(nothrow) receiver_thread_start_arg_t(this,argp));
01415 if (pthread_status)
01416 {
01417 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "A new thread could not be created: " << strerror(pthread_status));
01418
01419 delete argp;
01420 }
01421 else
01422 {
01423 lock();
01424
01425
01426
01427 pair<recv_thread_argmap_t::iterator, bool> tmpinsiterator=
01428 recv_thread_argmap.insert( pair<pthread_t,receiver_thread_arg_t*> (peer_assoc->thread_ID,argp) );
01429 if (tmpinsiterator.second == false)
01430 {
01431 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Thread argument could not be inserted into hashmap");
01432 }
01433 unlock();
01434 }
01435 }
01436
01437
01445 void*
01446 TPoverTCP::master_listener_thread_starter(void *argp)
01447 {
01448
01449 if (argp != 0)
01450 {
01451 (static_cast<TPoverTCP*>(argp))->master_listener_thread();
01452 }
01453 return 0;
01454 }
01455
01456
01457
01463 void
01464 TPoverTCP::master_listener_thread()
01465 {
01466
01467 struct sockaddr_in6 own_address_v6;
01468 own_address_v6.sin6_family = AF_INET6;
01469 own_address_v6.sin6_flowinfo= 0;
01470 own_address_v6.sin6_port = htons(tpparam.port);
01471
01472 own_address_v6.sin6_addr = in6addr_any;
01473 own_address_v6.sin6_scope_id= 0;
01474
01475
01476 struct sockaddr_in own_address_v4;
01477 own_address_v4.sin_family = AF_INET;
01478 own_address_v4.sin_port = htons(tpparam.port);
01479
01480 own_address_v4.sin_addr.s_addr = INADDR_ANY;
01481
01482
01483 int master_listener_socket= socket( AF_INET6, SOCK_STREAM, IPPROTO_TCP);
01484 if (master_listener_socket!=-1) v4_mode = false;
01485 if (master_listener_socket == -1) {
01486 master_listener_socket= socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
01487 if (master_listener_socket!=-1) v4_mode = true;
01488 }
01489 if (master_listener_socket == -1)
01490 {
01491 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Could not create a new socket, error: " << strerror(errno));
01492 return;
01493 }
01494
01495
01496 int nodelayflag= 1;
01497 int status= setsockopt(master_listener_socket,
01498 IPPROTO_TCP,
01499 TCP_NODELAY,
01500 &nodelayflag,
01501 sizeof(nodelayflag));
01502 if (status)
01503 {
01504 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option TCP_NODELAY:" << strerror(errno));
01505 }
01506
01507
01508 int socketreuseflag= 1;
01509 status= setsockopt(master_listener_socket,
01510 SOL_SOCKET,
01511 SO_REUSEADDR,
01512 (const char *) &socketreuseflag,
01513 sizeof(socketreuseflag));
01514 if (status)
01515 {
01516 Log(ERROR_LOG,LOG_NORMAL,tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
01517 }
01518
01519
01520
01521 int bind_status = bind(master_listener_socket, v4_mode ?
01522 reinterpret_cast<struct sockaddr *>(&own_address_v4) :
01523 reinterpret_cast<struct sockaddr *>(&own_address_v6),
01524 v4_mode ? sizeof(own_address_v4) : sizeof(own_address_v6));
01525 if (bind_status)
01526 {
01527 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Binding to "
01528 << (v4_mode ? inet_ntop(AF_INET, &own_address_v4.sin_addr, in_addrstr, INET_ADDRSTRLEN) :
01529 inet_ntop(AF_INET6, &own_address_v6.sin6_addr, in6_addrstr, INET6_ADDRSTRLEN))
01530 << " port " << tpparam.port << " failed, error: " << strerror(errno));
01531 return;
01532 }
01533
01534
01535
01536 int listen_status = listen(master_listener_socket, max_listen_queue_size);
01537 if (listen_status)
01538 {
01539 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Listen at socket " << master_listener_socket
01540 << " failed, error: " << strerror(errno));
01541 return;
01542 }
01543 else
01544 {
01545 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
01546 }
01547
01548
01549 fcntl(master_listener_socket,F_SETFL, O_NONBLOCK);
01550
01551
01552 struct pollfd poll_fd;
01553 poll_fd.fd = master_listener_socket;
01554 poll_fd.events = POLLIN | POLLPRI;
01555 poll_fd.revents = 0;
01556
01557
01558
01559
01560
01561
01562 bool terminate = false;
01563
01564 state_t currstate= get_state();
01565 int poll_status= 0;
01566 const unsigned int number_poll_sockets= 1;
01567 struct sockaddr_in6 peer_address;
01568 socklen_t peer_address_len;
01569 int conn_socket;
01570
01571
01572 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
01573 {
01574
01575
01576 poll_status= poll(&poll_fd, number_poll_sockets, tpparam.sleep_time);
01577 if (poll_fd.revents & POLLERR)
01578 {
01579 if (errno != EINTR)
01580 {
01581 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
01582 "Poll caused error " << strerror(errno) << " - indicated by revents");
01583 }
01584 else
01585 {
01586 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "poll(): " << strerror(errno));
01587 }
01588
01589 }
01590 if (poll_fd.revents & POLLHUP)
01591 {
01592 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll hung up");
01593 return;
01594 }
01595 if (poll_fd.revents & POLLNVAL)
01596 {
01597 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll Invalid request: fd not open");
01598 return;
01599 }
01600
01601 switch (poll_status)
01602 {
01603 case -1:
01604 if (errno != EINTR)
01605 {
01606 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Poll status indicates error: " << strerror(errno));
01607 }
01608 else
01609 {
01610 Log(EVENT_LOG,LOG_NORMAL, tpparam.name, "Poll status: " << strerror(errno));
01611 }
01612
01613 break;
01614
01615 case 0:
01616 #ifdef DEBUG_HARD
01617 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
01618 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
01619 #endif
01620 currstate= get_state();
01621 continue;
01622 break;
01623
01624 default:
01625 #ifdef DEBUG_HARD
01626 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
01627 #endif
01628 break;
01629 }
01630
01631
01632
01633
01634 peer_address_len= sizeof(peer_address);
01635 conn_socket = accept (master_listener_socket,
01636 reinterpret_cast<struct sockaddr *>(&peer_address),
01637 &peer_address_len);
01638 if (conn_socket == -1)
01639 {
01640 if (errno != EWOULDBLOCK && errno != EAGAIN)
01641 {
01642 Log(ERROR_LOG,LOG_EMERG, tpparam.name, "Accept at socket " << master_listener_socket
01643 << " failed, error: " << strerror(errno));
01644 return;
01645 }
01646 }
01647 else
01648 {
01649
01650 AssocData* peer_assoc = NULL;
01651 appladdress addr(peer_address, IPPROTO_TCP);
01652
01653 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "<<--Received connect--<< request from " << addr.get_ip_str()
01654 << " port #" << addr.get_port());
01655
01656 struct sockaddr_in6 own_address;
01657 if (v4_mode) {
01658 struct sockaddr_in own_address_v4;
01659 socklen_t own_address_len_v4 = sizeof(own_address_v4);
01660 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address_v4), &own_address_len_v4);
01661 v4_to_v6(&own_address_v4, &own_address);
01662 } else {
01663 socklen_t own_address_len= sizeof(own_address);
01664 getsockname(conn_socket, reinterpret_cast<struct sockaddr*>(&own_address), &own_address_len);
01665 }
01666
01667
01668
01669 peer_assoc = new(nothrow) AssocData(conn_socket, addr, appladdress(own_address,IPPROTO_TCP));
01670
01671 bool insert_success= false;
01672 if (peer_assoc)
01673 {
01674
01675 lock();
01676 insert_success= connmap.insert(peer_assoc);
01677
01678 unlock();
01679 }
01680
01681
01682 if (insert_success == false)
01683 {
01684 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot insert AssocData for socket " << conn_socket
01685 << ", " << addr.get_ip_str() << ", port #"
01686 << addr.get_port() << " into connection map, aborting connection...");
01687
01688
01689 close (conn_socket);
01690 if (peer_assoc)
01691 {
01692 delete peer_assoc;
01693 peer_assoc= 0;
01694 }
01695 return;
01696
01697 }
01698
01699
01700 create_new_receiver_thread(peer_assoc);
01701 }
01702
01703
01704 currstate= get_state();
01705
01706 }
01707 return;
01708 }
01709
01710
01711 TPoverTCP::~TPoverTCP()
01712 {
01713 init= false;
01714 this->connmap.clear();
01715
01716 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Destructor called");
01717
01718 QueueManager::instance()->unregister_queue(tpparam.source);
01719 }
01720
01731 void
01732 TPoverTCP::main_loop(uint32 nr)
01733 {
01734
01735
01736 FastQueue* fq = get_fqueue();
01737 if (!fq)
01738 {
01739 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Cannot find message queue");
01740 return;
01741 }
01742
01743 QueueManager::instance()->register_queue(fq,tpparam.source);
01744
01745
01746 pthread_t master_listener_thread_ID;
01747 int pthread_status= pthread_create(&master_listener_thread_ID,
01748 NULL,
01749
01750 master_listener_thread_starter,
01751 this);
01752 if (pthread_status)
01753 {
01754 Log(ERROR_LOG,LOG_CRIT, tpparam.name,
01755 "New master listener thread could not be created: " << strerror(pthread_status));
01756 }
01757 else
01758 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Master listener thread started");
01759
01760
01761
01762 timespec wait_interval= { 0, 250000000L };
01763 message* internal_thread_msg = NULL;
01764 state_t currstate= get_state();
01765
01766
01767 while( currstate!=STATE_ABORT && currstate!=STATE_STOP )
01768 {
01769
01770 if ( (internal_thread_msg= fq->dequeue_timedwait(wait_interval)) != 0 )
01771 {
01772 TPoverTCPMsg* internalmsg= dynamic_cast<TPoverTCPMsg*>(internal_thread_msg);
01773 if (internalmsg)
01774 {
01775 if (internalmsg->get_msgtype() == TPoverTCPMsg::stop)
01776 {
01777
01778 AssocData* assocd= const_cast<AssocData*>(internalmsg->get_peer_assoc());
01779 Log(DEBUG_LOG,LOG_NORMAL, tpparam.name, "Got cleanup request for thread <" << assocd->thread_ID <<'>');
01780 lock();
01781 cleanup_receiver_thread( assocd );
01782 unlock();
01783 }
01784 else
01785 if (internalmsg->get_msgtype() == TPoverTCPMsg::start)
01786 {
01787
01788 create_new_receiver_thread( const_cast<AssocData*>(internalmsg->get_peer_assoc()) );
01789 }
01790 else
01791 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "unexpected internal message:" << internalmsg->get_msgtype());
01792
01793 delete internalmsg;
01794 }
01795 else
01796 {
01797 Log(ERROR_LOG,LOG_CRIT, tpparam.name, "Dynamic_cast failed - received unexpected and unknown internal message source "
01798 << internal_thread_msg->get_source());
01799 }
01800 }
01801
01802
01803 currstate= get_state();
01804 }
01805
01806 if (currstate==STATE_STOP)
01807 {
01808
01809 Log(INFO_LOG,LOG_NORMAL, tpparam.name, "Asked to abort, stopping all receiver threads");
01810 }
01811
01812
01813 fq->shutdown();
01814
01815 terminate_all_threads();
01816 }
01817
01818 }