00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 extern "C"
00031 {
00032
00033 #include <unistd.h>
00034 #include <sys/types.h>
00035 #include <netinet/ip.h>
00036 #include <netinet/ip6.h>
00037 #include <netinet/in.h>
00038 #include <netinet/tcp.h>
00039 #include <netinet/udp.h>
00040 #include <sys/socket.h>
00041 #include <arpa/inet.h>
00042
00043 #include <fcntl.h>
00044 #include <sys/poll.h>
00045 }
00046
00047 #include <iostream>
00048 #include <errno.h>
00049 #include <string>
00050 #include <sstream>
00051
00052 #include "tp_over_udp.h"
00053 #include "threadsafe_db.h"
00054 #include "cleanuphandler.h"
00055 #include "setuid.h"
00056 #include "logfile.h"
00057 #include "linux/netfilter.h"
00058
00059 #include <set>
00060
00061 #define UDP_SUCCESS 0
00062 #define UDP_SEND_FAILURE 1
00063
00064 #define BUFSIZE 2048000
00065
00066
00067 const unsigned int max_listen_queue_size = 10;
00068
00069 namespace protlib
00070 {
00071
00072 using namespace log;
00073
00079 char in6_addrstr_loc[INET6_ADDRSTRLEN+1];
00080
00081
00082
00083
00090 void TPoverUDP::send (NetMsg * netmsg, const address & in_addr, bool use_existing_connection)
00091 {
00092
00093 appladdress* addr = NULL;
00094 addr= dynamic_cast<appladdress*>(in_addr.copy());
00095
00096 if (!addr) return;
00097
00098
00099 udpsend(netmsg, addr);
00100
00101 }
00102
00110 void
00111 TPoverUDP::udpsend (NetMsg * netmsg, appladdress * addr)
00112 {
00113 #ifndef _NO_LOGGING
00114 const char *const thisproc = "sender - ";
00115 #endif
00116
00117
00118
00119 int result = UDP_SUCCESS;
00120 int ret = 0;
00121
00122
00123 if (addr)
00124 check_send_args (*netmsg, *addr);
00125 else
00126 {
00127 ERRCLog (tpparam.name, thisproc << "address pointer is NULL");
00128 result = UDP_SEND_FAILURE;
00129 throw TPErrorInternal();
00130 }
00131
00132
00133 addr->convert_to_ipv6();
00134 in6_addr ip6addr;
00135
00136
00137 addr->get_ip(ip6addr);
00138
00139
00140
00141
00142
00143
00144 struct msghdr header;
00145
00146
00147 struct cmsghdr *ancillary = NULL;
00148
00149
00150 struct iovec iov;
00151 iov.iov_base = netmsg->get_buffer();
00152 iov.iov_len = netmsg->get_size();
00153
00154
00155 struct sockaddr_in6 dest_address;
00156 dest_address.sin6_family= AF_INET6;
00157 dest_address.sin6_port = htons(addr->get_port());
00158 dest_address.sin6_addr = ip6addr;
00159 dest_address.sin6_flowinfo = 0;
00160 dest_address.sin6_scope_id = 0;
00161
00162
00163 header.msg_iov = &iov;
00164 header.msg_iovlen = 1;
00165 header.msg_name = &dest_address;
00166 header.msg_namelen=sizeof(dest_address);
00167 header.msg_control=NULL;
00168 header.msg_controllen=0;
00169
00170
00171
00172 in6_pktinfo pktinfo;
00173
00174
00175
00176
00177
00178
00179 uint32 buflength = 0;
00180 if (addr->get_if_index()) {
00181 buflength = CMSG_SPACE(sizeof(pktinfo));
00182
00183 }
00184
00185 int hlim = addr->get_ip_ttl();
00186
00187 if (hlim) {
00188 buflength = buflength + CMSG_SPACE(sizeof(int));
00189
00190 }
00191
00192 if ((addr->get_if_index()) || hlim) {
00193 header.msg_control = malloc(buflength);
00194 if (header.msg_control == 0)
00195 ERRCLog(tpparam.name, thisproc << " malloc failed for ancillary data of size " << buflength);
00196 }
00197
00198
00199 if (addr->get_if_index()) {
00200
00201 DLog(tpparam.name, thisproc << " UDP send via Interface " << addr->get_if_index() << " requested.");
00202
00203
00204 ancillary = (cmsghdr*) header.msg_control;
00205
00206 ancillary->cmsg_level=IPPROTO_IPV6;
00207 ancillary->cmsg_type=IPV6_PKTINFO;
00208 ancillary->cmsg_len=CMSG_LEN(sizeof(pktinfo));
00209
00210
00211
00212 pktinfo.ipi6_addr = in6addr_any;
00213 pktinfo.ipi6_ifindex = addr->get_if_index();
00214
00215 memcpy (CMSG_DATA(ancillary), &pktinfo, sizeof(pktinfo));
00216
00217
00218
00219
00220 header.msg_controllen = CMSG_SPACE(sizeof(pktinfo));
00221
00222 }
00223
00224
00225 if (hlim) {
00226 DLog(tpparam.name, thisproc << " UDP send with IP TTL of " << hlim << " requested.");
00227
00228
00229 cmsghdr* ancillary2 = NULL;
00230
00231 if (ancillary) {
00232 ancillary2 = (cmsghdr*) (ancillary + CMSG_SPACE(sizeof(pktinfo)));
00233 } else {
00234 ancillary2 = (cmsghdr*) header.msg_control;
00235 }
00236
00237 ancillary2->cmsg_level=IPPROTO_IPV6;
00238 ancillary2->cmsg_type=IPV6_HOPLIMIT;
00239 ancillary2->cmsg_len = CMSG_LEN(sizeof(int));
00240
00241 memcpy(CMSG_DATA(ancillary2), &hlim, sizeof(int));
00242
00243
00244 header.msg_controllen = header.msg_controllen + ancillary2->cmsg_len;
00245
00246 }
00247
00248 #ifndef _NO_LOGGING
00249 uint32 msgsize = netmsg->get_size();
00250 #endif
00251
00252
00253
00254
00255
00256 while (master_listener_socket == -1)
00257 {
00258 const unsigned int sleeptime= 1;
00259 DLog(tpparam.name, "socket not yet ready for sending - sending deferred (" << sleeptime << " s)");
00260 sleep(sleeptime);
00261 DLog(tpparam.name, "retrying to send");
00262 }
00263
00264 ret = setsockopt(master_listener_socket, SOL_IP, IP_OPTIONS, 0, 0);
00265 if ( ret != 0 )
00266 ERRLog(tpparam.name, "unsetting IP options for IPv4 failed");
00267
00268
00269 DLog(tpparam.name, "SEND to " << *addr);
00270 ret= sendmsg(master_listener_socket,&header,MSG_DONTWAIT);
00271
00272 if (ret<0)
00273 ERRCLog(tpparam.name, "Socket Send failed! - error (" << errno << "):" << strerror(errno));
00274 if (debug_pdu)
00275 {
00276 ostringstream hexdump;
00277 netmsg->hexdump (hexdump);
00278 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
00279 "PDU debugging enabled - Sent:" << hexdump.str ());
00280 }
00281
00282 if (ret < 0)
00283 {
00284 result = UDP_SEND_FAILURE;
00285
00286 }
00287
00288
00289
00290 delete netmsg;
00291
00292
00293
00294
00295
00296 if (result != UDP_SUCCESS)
00297 {
00298 ERRLog(tpparam.name, thisproc << "UDP error, returns " << ret << ", error : " << strerror (errno));
00299 delete addr;
00300
00301 throw TPErrorSendFailed();
00302
00303 }
00304 else
00305 Log (EVENT_LOG, LOG_NORMAL, tpparam.name,
00306 thisproc << ">>----Sent---->> message (" << msgsize <<
00307 " bytes) using socket " << master_listener_socket << " to " << *addr);
00308
00309
00310 delete addr;
00311 }
00312
00313
00314
00322 void *
00323 TPoverUDP::listener_thread_starter (void *argp)
00324 {
00325
00326 if (argp != 0)
00327 {
00328 (static_cast < TPoverUDP * >(argp))->listener_thread ();
00329 }
00330 return 0;
00331 }
00332
00333
00334
00335
00336
00341 void TPoverUDP::listener_thread ()
00342 {
00343
00344 struct sockaddr_in6 own_address;
00345 own_address.sin6_family = AF_INET6;
00346 own_address.sin6_flowinfo= 0;
00347 own_address.sin6_port = htons(tpparam.port);
00348
00349 own_address.sin6_addr = in6addr_any;
00350 own_address.sin6_scope_id= 0;
00351
00352
00353 master_listener_socket= socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
00354 if (master_listener_socket == -1)
00355 {
00356 ERRCLog(tpparam.name, "Could not create a new socket, error: " << strerror(errno));
00357 return;
00358 }
00359
00360 int socketreuseflag= 1;
00361 int status= setsockopt(master_listener_socket,
00362 SOL_SOCKET,
00363 SO_REUSEADDR,
00364 (const char *) &socketreuseflag,
00365 sizeof(socketreuseflag));
00366 if (status)
00367 {
00368 ERRCLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
00369 }
00370
00371
00372
00373
00374 int bind_status = bind(master_listener_socket,
00375 reinterpret_cast<struct sockaddr *>(&own_address),
00376 sizeof(own_address));
00377 if (bind_status)
00378 {
00379 ERRCLog(tpparam.name, "Binding to "
00380 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_loc, INET6_ADDRSTRLEN)
00381 << " port " << tpparam.port << " failed, error: " << strerror(errno));
00382 return;
00383 }
00384
00385
00386
00387 struct pollfd poll_fd;
00388 poll_fd.fd = master_listener_socket;
00389 poll_fd.events = POLLIN | POLLPRI;
00390 poll_fd.revents = 0;
00391
00392
00393
00394
00395
00396
00397 bool terminate = false;
00398
00399 state_t currstate= get_state();
00400 int poll_status= 0;
00401 const unsigned int number_poll_sockets= 1;
00402 struct sockaddr_in6 peer_address;
00403 socklen_t peer_address_len;
00404
00405
00406
00407 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
00408 {
00409
00410
00411
00412
00413 poll_status= poll(&poll_fd, number_poll_sockets, 250);
00414 if (poll_fd.revents & POLLERR)
00415 {
00416 if (errno != EINTR)
00417 {
00418 ERRCLog(tpparam.name, "Poll caused error " << strerror(errno) << " - indicated by revents");
00419 }
00420 else
00421 {
00422 EVLog(tpparam.name, "poll(): " << strerror(errno));
00423 }
00424
00425 }
00426 if (poll_fd.revents & POLLHUP)
00427 {
00428 ERRCLog(tpparam.name, "Poll hung up");
00429 return;
00430 }
00431 if (poll_fd.revents & POLLNVAL)
00432 {
00433 ERRCLog(tpparam.name, "Poll Invalid request: fd not open");
00434 return;
00435 }
00436
00437 switch (poll_status)
00438 {
00439 case -1:
00440 if (errno != EINTR)
00441 {
00442 ERRCLog(tpparam.name, "Poll status indicates error: " << strerror(errno));
00443 }
00444 else
00445 {
00446 EVLog(tpparam.name, "Poll status: " << strerror(errno));
00447 }
00448
00449 break;
00450
00451 case 0:
00452 #ifdef DEBUG_HARD
00453 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
00454 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
00455 #endif
00456 currstate= get_state();
00457 continue;
00458 break;
00459
00460 default:
00461 #ifdef DEBUG_HARD
00462 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
00463 #endif
00464 break;
00465 }
00466
00467
00468
00469
00470
00471 if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
00472
00473
00474
00475 peer_address_len= sizeof(peer_address);
00476
00477
00478 NetMsg *netmsg=0;
00479 netmsg = new NetMsg (NetMsg::max_size);
00480
00481
00482
00484 int ret = recvfrom (master_listener_socket,
00485 netmsg->get_buffer (), NetMsg::max_size, 0, reinterpret_cast<struct sockaddr *>(&peer_address),
00486 &peer_address_len);
00487
00488 if (ret)
00489 {
00490 DLog(tpparam.name, "Yankeedoo, we received " << ret << " bytes of DATA!!");
00491
00492
00493 netmsg->truncate(ret);
00494 }
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506 appladdress* peer_addr = new appladdress;
00507 peer_addr->set_ip(peer_address.sin6_addr);
00508 peer_addr->set_port(peer_address.sin6_port);
00509 appladdress* own_addr = new appladdress();
00510
00511
00512 char source_addr[INET6_ADDRSTRLEN+1];
00513 inet_ntop(AF_INET6, &peer_address.sin6_addr, source_addr, INET6_ADDRSTRLEN);
00514
00515
00516 peer_addr->set_port(htons(peer_address.sin6_port));
00517 peer_addr->set_ip(peer_address.sin6_addr);
00518 peer_addr->set_protocol(get_underlying_protocol());
00519
00520 DLog(tpparam.name, "Peer: [" << *peer_addr << "]");
00521
00522
00523
00524 TPMsg *tpmsg=
00525 new (nothrow) TPMsg (netmsg, peer_addr, own_addr);
00526
00527 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
00528 "recvthread - receipt of GIST PDU now complete, sending msg#" << tpmsg->get_id() << " to signaling module");
00529
00530
00531 if (tpmsg == NULL || !tpmsg->send(tpparam.source, tpparam.dest))
00532 {
00533 ERRLog(tpparam.name, "rcvthread" << "Cannot allocate/send TPMsg");
00534 if (tpmsg)
00535 delete tpmsg;
00536 if (netmsg)
00537 delete netmsg;
00538
00539 }
00540
00541 }
00542
00543
00544 currstate= get_state();
00545
00546 }
00547
00548 return;
00549
00550 }
00551
00552
00553 TPoverUDP::~TPoverUDP ()
00554 {
00555 init = false;
00556
00557 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name, "Destructor called");
00558
00559 }
00560
00569 void
00570 TPoverUDP::main_loop (uint32 nr)
00571 {
00572
00573 int pthread_status = 0;
00574
00575
00576
00577 pthread_t listener_thread_ID;
00578 pthread_status = pthread_create (&listener_thread_ID, NULL,
00579 listener_thread_starter, this);
00580 if (pthread_status)
00581 {
00582 ERRCLog(tpparam.name,
00583 "UDP listening thread could not be created: " <<
00584 strerror (pthread_status));
00585 }
00586 else
00587
00588 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
00589
00590
00591
00592
00593 state_t currstate = get_state ();
00594
00595
00596 while (currstate != STATE_ABORT && currstate != STATE_STOP)
00597 {
00598
00599
00600 currstate = get_state ();
00601
00602 sleep(4);
00603
00604 }
00605
00606 if (currstate == STATE_STOP)
00607 {
00608
00609 Log (INFO_LOG, LOG_NORMAL, tpparam.name,
00610 "Asked to abort, stopping all receiver threads");
00611 }
00612
00613
00614
00615
00616 }
00617
00618
00619 void
00620 TPoverUDP::terminate(const address& addr)
00621 {
00622
00623 }
00624
00625 }
00626