source: source/ariba/utility/transport/tcpip/protlib/tp_over_udp.cpp@ 10570

Last change on this file since 10570 was 10431, checked in by bless@…, 13 years ago
  • added close() statement for listener socket
File size: 16.9 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_udp.cpp
3/// UDP-based transport module
4/// ----------------------------------------------------------
5/// $Id: tp_over_udp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_udp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/ip.h> /* iphdr */
36#include <netinet/ip6.h> /* ip6_hdr */
37#include <netinet/in.h> /* network socket interface */
38#include <netinet/tcp.h> /* for TCP Socket Option */
39#include <netinet/udp.h> /* for UDP header */
40#include <sys/socket.h>
41#include <arpa/inet.h> /* inet_addr */
42
43#ifndef IPPROTO_IPV6
44 // v6 structs not defined in netinet/in.h, pull in here
45 #include <linux/ipv6.h>
46#endif
47
48#include <fcntl.h>
49#include <sys/poll.h>
50}
51
52#include <iostream>
53#include <errno.h>
54#include <string>
55#include <sstream>
56
57#include "tp_over_udp.h"
58#include "threadsafe_db.h"
59#include "cleanuphandler.h"
60#include "setuid.h"
61#include "logfile.h"
62#include "linux/netfilter.h"
63
64#include <set>
65
66#define UDP_SUCCESS 0
67#define UDP_SEND_FAILURE 1
68
69#define BUFSIZE 2048000
70
71
72const unsigned int max_listen_queue_size = 10;
73
74namespace protlib
75{
76
77 using namespace log;
78
79/** @defgroup protlib
80 * @ingroup protlib
81 * @{
82 */
83
84char in6_addrstr_loc[INET6_ADDRSTRLEN+1];
85
86/******* class TPoverUDP *******/
87
88
89/** generates an internal TPoverUDP message to send a NetMsg to the network
90 *
91 * - the sending thread will call TPoverUDP::udpsend()
92 * - since UDP is connectionless we can safely ignore the use_existing_connection attribute
93 * @note the netmsg is deleted by the send() method when it is not used anymore
94 */
95void TPoverUDP::send (NetMsg * netmsg, const address & in_addr, bool use_existing_connection)
96{
97
98 appladdress* addr = NULL;
99 addr= dynamic_cast<appladdress*>(in_addr.copy());
100
101 if (!addr) return;
102
103 // Do it independently from master thread
104 udpsend(netmsg, addr);
105
106}
107
108/** sends a NetMsg to the network.
109 *
110 * @param netmsg message to send
111 * @param addr transport endpoint address
112 *
113 * @note both parameters are deleted after the message was sent
114 */
115void
116TPoverUDP::udpsend (NetMsg * netmsg, appladdress * addr)
117{
118#ifndef _NO_LOGGING
119 const char *const thisproc = "sender - ";
120#endif
121
122 // set result initially to success, set it to failure
123 // in places where these failures occur
124 int result = UDP_SUCCESS;
125 int ret = 0;
126
127
128 if (addr)
129 check_send_args (*netmsg, *addr);
130 else
131 {
132 ERRCLog (tpparam.name, thisproc << "address pointer is NULL");
133 result = UDP_SEND_FAILURE;
134 throw TPErrorInternal();
135 }
136
137
138 addr->convert_to_ipv6();
139 in6_addr ip6addr;
140
141 //convert to v4-mapped address if necessary! (we use dual-stack IPv4/IPv6 socket)
142 addr->get_ip(ip6addr);
143
144
145 // *********************************** revised socket code *********************************
146
147
148 // msghdr for sendmsg
149 struct msghdr header;
150
151 // pointer for ancillary data
152 struct cmsghdr *ancillary = NULL;
153
154 // iovec for sendmsg
155 struct iovec iov;
156 iov.iov_base = netmsg->get_buffer();
157 iov.iov_len = netmsg->get_size();
158
159 // destination address
160 struct sockaddr_in6 dest_address;
161 dest_address.sin6_family= AF_INET6;
162 dest_address.sin6_port = htons(addr->get_port());
163 dest_address.sin6_addr = ip6addr;
164 dest_address.sin6_flowinfo = 0;
165 dest_address.sin6_scope_id = 0;
166
167 // fill msghdr
168 header.msg_iov = &iov;
169 header.msg_iovlen = 1;
170 header.msg_name = &dest_address;
171 header.msg_namelen=sizeof(dest_address);
172 header.msg_control=NULL;
173 header.msg_controllen=0;
174
175
176 // pktinfo
177 in6_pktinfo pktinfo;
178
179 //addr->set_if_index(1);
180
181
182 // we have to add up to 2 ancillary data objects (for interface and hop limit)
183
184 uint32 buflength = 0;
185 if (addr->get_if_index()) {
186 buflength = CMSG_SPACE(sizeof(pktinfo));
187 //cout << "PKTINFO data object, total buffer size: " << buflength << "byte" << endl;
188 }
189
190 int hlim = addr->get_ip_ttl();
191
192 if (hlim) {
193 buflength = buflength + CMSG_SPACE(sizeof(int));
194 //cout << "HOPLIMIT data object, total buffer size: " << buflength << "byte" << endl;
195 }
196 // create the buffer
197 if ((addr->get_if_index()) || hlim) {
198 header.msg_control = malloc(buflength);
199 if (header.msg_control == 0)
200 ERRCLog(tpparam.name, thisproc << " malloc failed for ancillary data of size " << buflength);
201 }
202
203 // are we to set the outgoing interface?
204 if (addr->get_if_index()) {
205
206 DLog(tpparam.name, thisproc << " UDP send via Interface " << addr->get_if_index() << " requested.");
207
208 // first cmsghdr at beginning of buffer
209 ancillary = (cmsghdr*) header.msg_control;
210
211 ancillary->cmsg_level=IPPROTO_IPV6;
212 ancillary->cmsg_type=IPV6_PKTINFO;
213 ancillary->cmsg_len=CMSG_LEN(sizeof(pktinfo));
214
215 //cout << "Set up properties of ancillary data object 1" << endl;
216
217 pktinfo.ipi6_addr = in6addr_any;
218 pktinfo.ipi6_ifindex = addr->get_if_index();
219
220 memcpy (CMSG_DATA(ancillary), &pktinfo, sizeof(pktinfo));
221
222 //cout << "Set up data of ancillary data object 1" << endl;
223
224 // update msghdr controllen
225 header.msg_controllen = CMSG_SPACE(sizeof(pktinfo));
226
227 }
228
229 // should we set an explicit Hop Limit?
230 if (hlim) {
231 DLog(tpparam.name, thisproc << " UDP send with IP TTL of " << hlim << " requested.");
232
233 // second cmsghdr after first one
234 cmsghdr* ancillary2 = NULL;
235
236 if (ancillary) {
237 ancillary2 = (cmsghdr*) (ancillary + CMSG_SPACE(sizeof(pktinfo)));
238 } else {
239 ancillary2 = (cmsghdr*) header.msg_control;
240 }
241
242 ancillary2->cmsg_level=IPPROTO_IPV6;
243 ancillary2->cmsg_type=IPV6_HOPLIMIT;
244 ancillary2->cmsg_len = CMSG_LEN(sizeof(int));
245
246 memcpy(CMSG_DATA(ancillary2), &hlim, sizeof(int));
247
248 // update msghdr controllen
249 header.msg_controllen = header.msg_controllen + ancillary2->cmsg_len;
250
251 }
252
253#ifndef _NO_LOGGING
254 uint32 msgsize = netmsg->get_size(); // only used for logging below
255#endif
256
257 // check whether socket is already up and initialized by listener thread
258 // otherwise we may have a race condition, i.e., trying to send before socket is created
259 // FIXME: it may be the case that the socket is already created, but not bound
260 // I'm not sure what happens, when we try to send...
261 while (master_listener_socket == -1)
262 {
263 const unsigned int sleeptime= 1;
264 DLog(tpparam.name, "socket not yet ready for sending - sending deferred (" << sleeptime << " s)");
265 sleep(sleeptime);
266 DLog(tpparam.name, "retrying to send");
267 }
268 // reset IP RAO option
269 ret = setsockopt(master_listener_socket, SOL_IP, IP_OPTIONS, 0, 0);
270 if ( ret != 0 )
271 ERRLog(tpparam.name, "unsetting IP options for IPv4 failed");
272
273 // send UDP packet
274 DLog(tpparam.name, "SEND to " << *addr);
275 ret= sendmsg(master_listener_socket,&header,MSG_DONTWAIT);
276
277 if (ret<0)
278 ERRCLog(tpparam.name, "Socket Send failed! - error (" << errno << "):" << strerror(errno));
279 if (debug_pdu)
280 {
281 ostringstream hexdump;
282 netmsg->hexdump (hexdump);
283 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
284 "PDU debugging enabled - Sent:" << hexdump.str ());
285 }
286
287 if (ret < 0)
288 {
289 result = UDP_SEND_FAILURE;
290 // break;
291 } // end if (ret < 0)
292
293
294 // *** note: netmsg is deleted here ***
295 delete netmsg;
296
297
298 // Throwing an exception within a critical section does not
299 // unlock the mutex.
300
301 if (result != UDP_SUCCESS)
302 {
303 ERRLog(tpparam.name, thisproc << "UDP error, returns " << ret << ", error : " << strerror (errno));
304 delete addr;
305
306 throw TPErrorSendFailed();
307
308 }
309 else
310 Log (EVENT_LOG, LOG_NORMAL, tpparam.name,
311 thisproc << ">>----Sent---->> message (" << msgsize <<
312 " bytes) using socket " << master_listener_socket << " to " << *addr);
313
314 // *** delete address ***
315 delete addr;
316} // end TPoverUDP::udpsend
317
318
319
320/**
321 * IPv4 catcher thread starter:
322 * just a static starter method to allow starting the
323 * actual master_listener_thread() method.
324 *
325 * @param argp - pointer to the current TPoverUDP object instance
326 */
327void *
328TPoverUDP::listener_thread_starter (void *argp)
329{
330 // invoke listener thread method
331 if (argp != 0)
332 {
333 (static_cast < TPoverUDP * >(argp))->listener_thread ();
334 }
335 return 0;
336}
337
338
339
340
341
342/**
343 * UDP master receiver thread: waits for incoming connections at the well-known udp port
344 *
345 */
346void TPoverUDP::listener_thread ()
347{
348 // create a new address-structure for the listening masterthread
349 struct sockaddr_in6 own_address;
350 own_address.sin6_family = AF_INET6;
351 own_address.sin6_flowinfo= 0;
352 own_address.sin6_port = htons(tpparam.port); // use port number in param structure
353 // accept incoming connections on all interfaces
354 own_address.sin6_addr = in6addr_any;
355 own_address.sin6_scope_id= 0;
356
357 // create a listening socket
358 master_listener_socket= socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
359 if (master_listener_socket == -1)
360 {
361 ERRCLog(tpparam.name, "Could not create a new socket, error: " << strerror(errno));
362 return;
363 }
364
365 int socketreuseflag= 1;
366 int status= setsockopt(master_listener_socket,
367 SOL_SOCKET,
368 SO_REUSEADDR,
369 (const char *) &socketreuseflag,
370 sizeof(socketreuseflag));
371 if (status)
372 {
373 ERRCLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
374 }
375
376 // TODO: insert multicast socket options/calls here
377
378 // bind the newly created socket to a specific address
379 int bind_status = bind(master_listener_socket,
380 reinterpret_cast<struct sockaddr *>(&own_address),
381 sizeof(own_address));
382 if (bind_status)
383 {
384 ERRCLog(tpparam.name, "Binding to "
385 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_loc, INET6_ADDRSTRLEN)
386 << " port " << tpparam.port << " failed, error: " << strerror(errno));
387 return;
388 }
389
390
391 // create a pollfd struct for use in the mainloop
392 struct pollfd poll_fd;
393 poll_fd.fd = master_listener_socket;
394 poll_fd.events = POLLIN | POLLPRI;
395 poll_fd.revents = 0;
396 /*
397 #define POLLIN 0x001 // There is data to read.
398 #define POLLPRI 0x002 // There is urgent data to read.
399 #define POLLOUT 0x004 // Writing now will not block.
400 */
401
402 bool terminate = false;
403 // check for thread terminate condition using get_state()
404 state_t currstate= get_state();
405 int poll_status= 0;
406 const unsigned int number_poll_sockets= 1;
407 struct sockaddr_in6 peer_address;
408 socklen_t peer_address_len;
409 // int conn_socket;
410
411 // check whether this thread is signaled for termination
412 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
413 {
414
415
416 // wait on number_poll_sockets (main drm socket)
417 // for the events specified above for sleep_time (in ms) tpparam.sleep_time
418 poll_status= poll(&poll_fd, number_poll_sockets, 250);
419 if (poll_fd.revents & POLLERR) // Error condition
420 {
421 if (errno != EINTR)
422 {
423 ERRCLog(tpparam.name, "Poll caused error " << strerror(errno) << " - indicated by revents");
424 }
425 else
426 {
427 EVLog(tpparam.name, "poll(): " << strerror(errno));
428 }
429
430 }
431 if (poll_fd.revents & POLLHUP) // Hung up
432 {
433 ERRCLog(tpparam.name, "Poll hung up");
434 return;
435 }
436 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
437 {
438 ERRCLog(tpparam.name, "Poll Invalid request: fd not open");
439 return;
440 }
441
442 switch (poll_status)
443 {
444 case -1:
445 if (errno != EINTR)
446 {
447 ERRCLog(tpparam.name, "Poll status indicates error: " << strerror(errno));
448 }
449 else
450 {
451 EVLog(tpparam.name, "Poll status: " << strerror(errno));
452 }
453
454 break;
455
456 case 0:
457#ifdef DEBUG_HARD
458 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
459 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
460#endif
461 currstate= get_state();
462 continue;
463 break;
464
465 default:
466#ifdef DEBUG_HARD
467 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
468#endif
469 break;
470 } // end switch
471
472
473
474 //if there is data to read, do it
475
476 if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
477
478
479 // in peer_address and the size of its address in addrlen
480 peer_address_len= sizeof(peer_address);
481
482 //Build us a NetMsg
483 NetMsg *netmsg=0;
484 netmsg = new NetMsg (NetMsg::max_size);
485
486
487
488 /// receive data from socket buffer (recv will not block)
489 int ret = recvfrom (master_listener_socket,
490 netmsg->get_buffer (), NetMsg::max_size, 0, reinterpret_cast<struct sockaddr *>(&peer_address),
491 &peer_address_len);
492
493 if (ret)
494 {
495 //DLog(tpparam.name, "Yankeedoo, we received " << ret << " bytes of DATA!!");
496
497 // truncate netmsg buffer
498 netmsg->truncate(ret);
499 }
500
501 /**************************************************************
502 * The following restrictions should apply: *
503 * *
504 * This is UDP, messages are contained in ONE datagram *
505 * datagrams CANNOT fragment, as otherwise TCP is used *
506 * so we now build a TPMsg, send it to signaling and *
507 * all should be well. At least until now. *
508 **************************************************************/
509
510 // Build peer_adr and own_addr
511 appladdress* peer_addr = new appladdress;
512 peer_addr->set_ip(peer_address.sin6_addr);
513 peer_addr->set_port(peer_address.sin6_port);
514 appladdress* own_addr = new appladdress();
515
516 // Log the sender peer and write to peer_addr
517 char source_addr[INET6_ADDRSTRLEN+1];
518 inet_ntop(AF_INET6, &peer_address.sin6_addr, source_addr, INET6_ADDRSTRLEN);
519
520
521 peer_addr->set_port(htons(peer_address.sin6_port));
522 peer_addr->set_ip(peer_address.sin6_addr);
523 peer_addr->set_protocol(get_underlying_protocol());
524
525 DLog(tpparam.name, "Peer: [" << *peer_addr << "]");
526
527 // create TPMsg and send it to the signaling thread
528 //fprintf (stderr, "Before TPMsg creation\n");
529 TPMsg *tpmsg=
530 new (nothrow) TPMsg (netmsg, peer_addr, own_addr);
531
532 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
533 "recvthread - receipt of GIST PDU now complete, sending msg#" << tpmsg->get_id() << " to signaling module");
534
535
536 if (tpmsg == NULL || !tpmsg->send(tpparam.source, tpparam.dest))
537 {
538 ERRLog(tpparam.name, "rcvthread" << "Cannot allocate/send TPMsg");
539 if (tpmsg)
540 delete tpmsg;
541 if (netmsg)
542 delete netmsg;
543
544 }
545
546 }
547
548 // get new thread state
549 currstate= get_state();
550
551 } // end while(!terminate)
552
553 close (master_listener_socket);
554 return;
555
556}
557
558
559TPoverUDP::~TPoverUDP ()
560{
561 init = false;
562
563 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name, "Destructor called");
564
565}
566
567/** TPoverUDP Thread main loop.
568 * This loop checks for internal messages of either
569 * a send() call to start a new receiver thread, or,
570 * of a receiver_thread() that signals its own termination
571 * for proper cleanup of control structures.
572 *
573 * @param nr number of current thread instance
574 */
575void
576TPoverUDP::main_loop (uint32 nr)
577{
578
579 int pthread_status = 0;
580
581
582 // start UDP listener thread
583 pthread_t listener_thread_ID;
584 pthread_status = pthread_create (&listener_thread_ID, NULL, //NULL: default attributes
585 listener_thread_starter, this);
586 if (pthread_status)
587 {
588 ERRCLog(tpparam.name,
589 "UDP listening thread could not be created: " <<
590 strerror (pthread_status));
591 }
592 else
593
594 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
595
596
597
598 // define max latency for thread reaction on termination/stop signal
599 state_t currstate = get_state ();
600
601 // check whether this thread is signaled for termination
602 while (currstate != STATE_ABORT && currstate != STATE_STOP)
603 {
604
605 // get thread state
606 currstate = get_state ();
607
608 sleep(4);
609
610 } // end while
611
612 if (currstate == STATE_STOP)
613 {
614 // start abort actions
615 Log (INFO_LOG, LOG_NORMAL, tpparam.name,
616 "Asked to abort, stopping all receiver threads");
617 } // end if stopped
618
619 // do not accept any more messages
620 // terminate all receiver and sender threads that are still active
621 //terminate_all_threads ();
622}
623
624
625void
626TPoverUDP::terminate(const address& addr)
627{
628 // no connection oriented protocol, nothing to terminate
629}
630
631} // end namespace protlib
632
633///@}
Note: See TracBrowser for help on using the repository browser.