source: source/ariba/communication/modules/transport/protlib/tp_over_udp.cpp@ 4608

Last change on this file since 4608 was 4608, checked in by Christoph Mayer, 15 years ago

unspec identifier handling

File size: 16.8 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_udp.cpp
3/// UDP-based transport module
4/// ----------------------------------------------------------
5/// $Id: tp_over_udp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_udp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/ip.h> /* iphdr */
36#include <netinet/ip6.h> /* ip6_hdr */
37#include <netinet/in.h> /* network socket interface */
38#include <netinet/tcp.h> /* for TCP Socket Option */
39#include <netinet/udp.h> /* for UDP header */
40#include <sys/socket.h>
41#include <arpa/inet.h> /* inet_addr */
42
43#include <fcntl.h>
44#include <sys/poll.h>
45}
46
47#include <iostream>
48#include <errno.h>
49#include <string>
50#include <sstream>
51
52#include "tp_over_udp.h"
53#include "threadsafe_db.h"
54#include "cleanuphandler.h"
55#include "setuid.h"
56#include "logfile.h"
57#include "linux/netfilter.h"
58
59#include <set>
60
61#define UDP_SUCCESS 0
62#define UDP_SEND_FAILURE 1
63
64#define BUFSIZE 2048000
65
66
67const unsigned int max_listen_queue_size = 10;
68
69namespace protlib
70{
71
72 using namespace log;
73
74/** @defgroup tpudp TP over UDP
75 * @ingroup network
76 * @{
77 */
78
79char in6_addrstr_loc[INET6_ADDRSTRLEN+1];
80
81/******* class TPoverUDP *******/
82
83
84/** generates an internal TPoverUDP message to send a NetMsg to the network
85 *
86 * - the sending thread will call TPoverUDP::udpsend()
87 * - since UDP is connectionless we can safely ignore the use_existing_connection attribute
88 * @note the netmsg is deleted by the send() method when it is not used anymore
89 */
90void TPoverUDP::send (NetMsg * netmsg, const address & in_addr, bool use_existing_connection)
91{
92
93 appladdress* addr = NULL;
94 addr= dynamic_cast<appladdress*>(in_addr.copy());
95
96 if (!addr) return;
97
98 // Do it independently from master thread
99 udpsend(netmsg, addr);
100
101}
102
103/** sends a NetMsg to the network.
104 *
105 * @param netmsg message to send
106 * @param addr transport endpoint address
107 *
108 * @note both parameters are deleted after the message was sent
109 */
110void
111TPoverUDP::udpsend (NetMsg * netmsg, appladdress * addr)
112{
113#ifndef _NO_LOGGING
114 const char *const thisproc = "sender - ";
115#endif
116
117 // set result initially to success, set it to failure
118 // in places where these failures occur
119 int result = UDP_SUCCESS;
120 int ret = 0;
121
122
123 if (addr)
124 check_send_args (*netmsg, *addr);
125 else
126 {
127 ERRCLog (tpparam.name, thisproc << "address pointer is NULL");
128 result = UDP_SEND_FAILURE;
129 throw TPErrorInternal();
130 }
131
132
133 addr->convert_to_ipv6();
134 in6_addr ip6addr;
135
136 //convert to v4-mapped address if necessary! (we use dual-stack IPv4/IPv6 socket)
137 addr->get_ip(ip6addr);
138
139
140 // *********************************** revised socket code *********************************
141
142
143 // msghdr for sendmsg
144 struct msghdr header;
145
146 // pointer for ancillary data
147 struct cmsghdr *ancillary = NULL;
148
149 // iovec for sendmsg
150 struct iovec iov;
151 iov.iov_base = netmsg->get_buffer();
152 iov.iov_len = netmsg->get_size();
153
154 // destination address
155 struct sockaddr_in6 dest_address;
156 dest_address.sin6_family= AF_INET6;
157 dest_address.sin6_port = htons(addr->get_port());
158 dest_address.sin6_addr = ip6addr;
159 dest_address.sin6_flowinfo = 0;
160 dest_address.sin6_scope_id = 0;
161
162 // fill msghdr
163 header.msg_iov = &iov;
164 header.msg_iovlen = 1;
165 header.msg_name = &dest_address;
166 header.msg_namelen=sizeof(dest_address);
167 header.msg_control=NULL;
168 header.msg_controllen=0;
169
170
171 // pktinfo
172 in6_pktinfo pktinfo;
173
174 //addr->set_if_index(1);
175
176
177 // we have to add up to 2 ancillary data objects (for interface and hop limit)
178
179 uint32 buflength = 0;
180 if (addr->get_if_index()) {
181 buflength = CMSG_SPACE(sizeof(pktinfo));
182 //cout << "PKTINFO data object, total buffer size: " << buflength << "byte" << endl;
183 }
184
185 int hlim = addr->get_ip_ttl();
186
187 if (hlim) {
188 buflength = buflength + CMSG_SPACE(sizeof(int));
189 //cout << "HOPLIMIT data object, total buffer size: " << buflength << "byte" << endl;
190 }
191 // create the buffer
192 if ((addr->get_if_index()) || hlim) {
193 header.msg_control = malloc(buflength);
194 if (header.msg_control == 0)
195 ERRCLog(tpparam.name, thisproc << " malloc failed for ancillary data of size " << buflength);
196 }
197
198 // are we to set the outgoing interface?
199 if (addr->get_if_index()) {
200
201 DLog(tpparam.name, thisproc << " UDP send via Interface " << addr->get_if_index() << " requested.");
202
203 // first cmsghdr at beginning of buffer
204 ancillary = (cmsghdr*) header.msg_control;
205
206 ancillary->cmsg_level=IPPROTO_IPV6;
207 ancillary->cmsg_type=IPV6_PKTINFO;
208 ancillary->cmsg_len=CMSG_LEN(sizeof(pktinfo));
209
210 //cout << "Set up properties of ancillary data object 1" << endl;
211
212 pktinfo.ipi6_addr = in6addr_any;
213 pktinfo.ipi6_ifindex = addr->get_if_index();
214
215 memcpy (CMSG_DATA(ancillary), &pktinfo, sizeof(pktinfo));
216
217 //cout << "Set up data of ancillary data object 1" << endl;
218
219 // update msghdr controllen
220 header.msg_controllen = CMSG_SPACE(sizeof(pktinfo));
221
222 }
223
224 // should we set an explicit Hop Limit?
225 if (hlim) {
226 DLog(tpparam.name, thisproc << " UDP send with IP TTL of " << hlim << " requested.");
227
228 // second cmsghdr after first one
229 cmsghdr* ancillary2 = NULL;
230
231 if (ancillary) {
232 ancillary2 = (cmsghdr*) (ancillary + CMSG_SPACE(sizeof(pktinfo)));
233 } else {
234 ancillary2 = (cmsghdr*) header.msg_control;
235 }
236
237 ancillary2->cmsg_level=IPPROTO_IPV6;
238 ancillary2->cmsg_type=IPV6_HOPLIMIT;
239 ancillary2->cmsg_len = CMSG_LEN(sizeof(int));
240
241 memcpy(CMSG_DATA(ancillary2), &hlim, sizeof(int));
242
243 // update msghdr controllen
244 header.msg_controllen = header.msg_controllen + ancillary2->cmsg_len;
245
246 }
247
248#ifndef _NO_LOGGING
249 uint32 msgsize = netmsg->get_size(); // only used for logging below
250#endif
251
252 // check whether socket is already up and initialized by listener thread
253 // otherwise we may have a race condition, i.e., trying to send before socket is created
254 // FIXME: it may be the case that the socket is already created, but not bound
255 // I'm not sure what happens, when we try to send...
256 while (master_listener_socket == -1)
257 {
258 const unsigned int sleeptime= 1;
259 DLog(tpparam.name, "socket not yet ready for sending - sending deferred (" << sleeptime << " s)");
260 sleep(sleeptime);
261 DLog(tpparam.name, "retrying to send");
262 }
263 // reset IP RAO option
264 ret = setsockopt(master_listener_socket, SOL_IP, IP_OPTIONS, 0, 0);
265 if ( ret != 0 )
266 ERRLog(tpparam.name, "unsetting IP options for IPv4 failed");
267
268 // send UDP packet
269 DLog(tpparam.name, "SEND to " << *addr);
270 ret= sendmsg(master_listener_socket,&header,MSG_DONTWAIT);
271
272 if (ret<0)
273 ERRCLog(tpparam.name, "Socket Send failed! - error (" << errno << "):" << strerror(errno));
274 if (debug_pdu)
275 {
276 ostringstream hexdump;
277 netmsg->hexdump (hexdump);
278 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
279 "PDU debugging enabled - Sent:" << hexdump.str ());
280 }
281
282 if (ret < 0)
283 {
284 result = UDP_SEND_FAILURE;
285 // break;
286 } // end if (ret < 0)
287
288
289 // *** note: netmsg is deleted here ***
290 delete netmsg;
291
292
293 // Throwing an exception within a critical section does not
294 // unlock the mutex.
295
296 if (result != UDP_SUCCESS)
297 {
298 ERRLog(tpparam.name, thisproc << "UDP error, returns " << ret << ", error : " << strerror (errno));
299 delete addr;
300
301 throw TPErrorSendFailed();
302
303 }
304 else
305 Log (EVENT_LOG, LOG_NORMAL, tpparam.name,
306 thisproc << ">>----Sent---->> message (" << msgsize <<
307 " bytes) using socket " << master_listener_socket << " to " << *addr);
308
309 // *** delete address ***
310 delete addr;
311} // end TPoverUDP::udpsend
312
313
314
315/**
316 * IPv4 catcher thread starter:
317 * just a static starter method to allow starting the
318 * actual master_listener_thread() method.
319 *
320 * @param argp - pointer to the current TPoverUDP object instance
321 */
322void *
323TPoverUDP::listener_thread_starter (void *argp)
324{
325 // invoke listener thread method
326 if (argp != 0)
327 {
328 (static_cast < TPoverUDP * >(argp))->listener_thread ();
329 }
330 return 0;
331}
332
333
334
335
336
337/**
338 * UDP master receiver thread: waits for incoming connections at the well-known udp port
339 *
340 */
341void TPoverUDP::listener_thread ()
342{
343 // create a new address-structure for the listening masterthread
344 struct sockaddr_in6 own_address;
345 own_address.sin6_family = AF_INET6;
346 own_address.sin6_flowinfo= 0;
347 own_address.sin6_port = htons(tpparam.port); // use port number in param structure
348 // accept incoming connections on all interfaces
349 own_address.sin6_addr = in6addr_any;
350 own_address.sin6_scope_id= 0;
351
352 // create a listening socket
353 master_listener_socket= socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
354 if (master_listener_socket == -1)
355 {
356 ERRCLog(tpparam.name, "Could not create a new socket, error: " << strerror(errno));
357 return;
358 }
359
360 int socketreuseflag= 1;
361 int status= setsockopt(master_listener_socket,
362 SOL_SOCKET,
363 SO_REUSEADDR,
364 (const char *) &socketreuseflag,
365 sizeof(socketreuseflag));
366 if (status)
367 {
368 ERRCLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
369 }
370
371 // TODO: insert multicast socket options/calls here
372
373 // bind the newly created socket to a specific address
374 int bind_status = bind(master_listener_socket,
375 reinterpret_cast<struct sockaddr *>(&own_address),
376 sizeof(own_address));
377 if (bind_status)
378 {
379 ERRCLog(tpparam.name, "Binding to "
380 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_loc, INET6_ADDRSTRLEN)
381 << " port " << tpparam.port << " failed, error: " << strerror(errno));
382 return;
383 }
384
385
386 // create a pollfd struct for use in the mainloop
387 struct pollfd poll_fd;
388 poll_fd.fd = master_listener_socket;
389 poll_fd.events = POLLIN | POLLPRI;
390 poll_fd.revents = 0;
391 /*
392 #define POLLIN 0x001 // There is data to read.
393 #define POLLPRI 0x002 // There is urgent data to read.
394 #define POLLOUT 0x004 // Writing now will not block.
395 */
396
397 bool terminate = false;
398 // check for thread terminate condition using get_state()
399 state_t currstate= get_state();
400 int poll_status= 0;
401 const unsigned int number_poll_sockets= 1;
402 struct sockaddr_in6 peer_address;
403 socklen_t peer_address_len;
404 // int conn_socket;
405
406 // check whether this thread is signaled for termination
407 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
408 {
409
410
411 // wait on number_poll_sockets (main drm socket)
412 // for the events specified above for sleep_time (in ms) tpparam.sleep_time
413 poll_status= poll(&poll_fd, number_poll_sockets, 250);
414 if (poll_fd.revents & POLLERR) // Error condition
415 {
416 if (errno != EINTR)
417 {
418 ERRCLog(tpparam.name, "Poll caused error " << strerror(errno) << " - indicated by revents");
419 }
420 else
421 {
422 EVLog(tpparam.name, "poll(): " << strerror(errno));
423 }
424
425 }
426 if (poll_fd.revents & POLLHUP) // Hung up
427 {
428 ERRCLog(tpparam.name, "Poll hung up");
429 return;
430 }
431 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
432 {
433 ERRCLog(tpparam.name, "Poll Invalid request: fd not open");
434 return;
435 }
436
437 switch (poll_status)
438 {
439 case -1:
440 if (errno != EINTR)
441 {
442 ERRCLog(tpparam.name, "Poll status indicates error: " << strerror(errno));
443 }
444 else
445 {
446 EVLog(tpparam.name, "Poll status: " << strerror(errno));
447 }
448
449 break;
450
451 case 0:
452#ifdef DEBUG_HARD
453 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
454 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
455#endif
456 currstate= get_state();
457 continue;
458 break;
459
460 default:
461#ifdef DEBUG_HARD
462 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
463#endif
464 break;
465 } // end switch
466
467
468
469 //if there is data to read, do it
470
471 if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
472
473
474 // in peer_address and the size of its address in addrlen
475 peer_address_len= sizeof(peer_address);
476
477 //Build us a NetMsg
478 NetMsg *netmsg=0;
479 netmsg = new NetMsg (NetMsg::max_size);
480
481
482
483 /// receive data from socket buffer (recv will not block)
484 int ret = recvfrom (master_listener_socket,
485 netmsg->get_buffer (), NetMsg::max_size, 0, reinterpret_cast<struct sockaddr *>(&peer_address),
486 &peer_address_len);
487
488 if (ret)
489 {
490 DLog(tpparam.name, "Yankeedoo, we received " << ret << " bytes of DATA!!");
491
492 // truncate netmsg buffer
493 netmsg->truncate(ret);
494 }
495
496 /**************************************************************
497 * The following restrictions should apply: *
498 * *
499 * This is UDP, messages are contained in ONE datagram *
500 * datagrams CANNOT fragment, as otherwise TCP is used *
501 * so we now build a TPMsg, send it to signaling and *
502 * all should be well. At least until now. *
503 **************************************************************/
504
505 // Build peer_adr and own_addr
506 appladdress* peer_addr = new appladdress;
507 peer_addr->set_ip(peer_address.sin6_addr);
508 peer_addr->set_port(peer_address.sin6_port);
509 appladdress* own_addr = new appladdress();
510
511 // Log the sender peer and write to peer_addr
512 char source_addr[INET6_ADDRSTRLEN+1];
513 inet_ntop(AF_INET6, &peer_address.sin6_addr, source_addr, INET6_ADDRSTRLEN);
514
515
516 peer_addr->set_port(htons(peer_address.sin6_port));
517 peer_addr->set_ip(peer_address.sin6_addr);
518 peer_addr->set_protocol(get_underlying_protocol());
519
520 DLog(tpparam.name, "Peer: [" << *peer_addr << "]");
521
522 // create TPMsg and send it to the signaling thread
523 //fprintf (stderr, "Before TPMsg creation\n");
524 TPMsg *tpmsg=
525 new (nothrow) TPMsg (netmsg, peer_addr, own_addr);
526
527 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
528 "recvthread - receipt of GIST PDU now complete, sending msg#" << tpmsg->get_id() << " to signaling module");
529
530
531 if (tpmsg == NULL || !tpmsg->send(tpparam.source, tpparam.dest))
532 {
533 ERRLog(tpparam.name, "rcvthread" << "Cannot allocate/send TPMsg");
534 if (tpmsg)
535 delete tpmsg;
536 if (netmsg)
537 delete netmsg;
538
539 }
540
541 }
542
543 // get new thread state
544 currstate= get_state();
545
546 } // end while(!terminate)
547
548 return;
549
550}
551
552
553TPoverUDP::~TPoverUDP ()
554{
555 init = false;
556
557 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name, "Destructor called");
558
559}
560
561/** TPoverUDP Thread main loop.
562 * This loop checks for internal messages of either
563 * a send() call to start a new receiver thread, or,
564 * of a receiver_thread() that signals its own termination
565 * for proper cleanup of control structures.
566 *
567 * @param nr number of current thread instance
568 */
569void
570TPoverUDP::main_loop (uint32 nr)
571{
572
573 int pthread_status = 0;
574
575
576 // start UDP listener thread
577 pthread_t listener_thread_ID;
578 pthread_status = pthread_create (&listener_thread_ID, NULL, //NULL: default attributes
579 listener_thread_starter, this);
580 if (pthread_status)
581 {
582 ERRCLog(tpparam.name,
583 "UDP listening thread could not be created: " <<
584 strerror (pthread_status));
585 }
586 else
587
588 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
589
590
591
592 // define max latency for thread reaction on termination/stop signal
593 state_t currstate = get_state ();
594
595 // check whether this thread is signaled for termination
596 while (currstate != STATE_ABORT && currstate != STATE_STOP)
597 {
598
599 // get thread state
600 currstate = get_state ();
601
602 sleep(4);
603
604 } // end while
605
606 if (currstate == STATE_STOP)
607 {
608 // start abort actions
609 Log (INFO_LOG, LOG_NORMAL, tpparam.name,
610 "Asked to abort, stopping all receiver threads");
611 } // end if stopped
612
613 // do not accept any more messages
614 // terminate all receiver and sender threads that are still active
615 //terminate_all_threads ();
616}
617
618
619void
620TPoverUDP::terminate(const address& addr)
621{
622 // no connection oriented protocol, nothing to terminate
623}
624
625} // end namespace protlib
626
627///@}
Note: See TracBrowser for help on using the repository browser.