source: source/ariba/communication/modules/transport/protlib/tp_over_udp.cpp@ 5638

Last change on this file since 5638 was 5638, checked in by Christoph Mayer, 15 years ago

adress detection aufgeräumt, network info für bleutooth, data stream (hopeful crash fix), logging auf maemo nur warn, ...

File size: 16.8 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_udp.cpp
3/// UDP-based transport module
4/// ----------------------------------------------------------
5/// $Id: tp_over_udp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_udp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32 //#define _SOCKADDR_LEN /* use BSD 4.4 sockets */
33#include <unistd.h> /* gethostname */
34#include <sys/types.h> /* network socket interface */
35#include <netinet/ip.h> /* iphdr */
36#include <netinet/ip6.h> /* ip6_hdr */
37#include <netinet/in.h> /* network socket interface */
38#include <netinet/tcp.h> /* for TCP Socket Option */
39#include <netinet/udp.h> /* for UDP header */
40#include <sys/socket.h>
41#include <arpa/inet.h> /* inet_addr */
42
43#include <fcntl.h>
44#include <sys/poll.h>
45}
46
47#include <iostream>
48#include <errno.h>
49#include <string>
50#include <sstream>
51
52#include "tp_over_udp.h"
53#include "threadsafe_db.h"
54#include "cleanuphandler.h"
55#include "setuid.h"
56#include "logfile.h"
57#include "linux/netfilter.h"
58
59#include <set>
60
61#define UDP_SUCCESS 0
62#define UDP_SEND_FAILURE 1
63
64#define BUFSIZE 2048000
65
66
67const unsigned int max_listen_queue_size = 10;
68
69namespace protlib
70{
71
72 using namespace log;
73
74/** @defgroup tpudp TP over UDP
75 * @ingroup network
76 * @{
77 */
78
79char in6_addrstr_loc[INET6_ADDRSTRLEN+1];
80
81/******* class TPoverUDP *******/
82
83
84/** generates an internal TPoverUDP message to send a NetMsg to the network
85 *
86 * - the sending thread will call TPoverUDP::udpsend()
87 * - since UDP is connectionless we can safely ignore the use_existing_connection attribute
88 * @note the netmsg is deleted by the send() method when it is not used anymore
89 */
90void TPoverUDP::send (NetMsg * netmsg, const address & in_addr, bool use_existing_connection)
91{
92
93 appladdress* addr = NULL;
94 addr= dynamic_cast<appladdress*>(in_addr.copy());
95
96 if (!addr) return;
97
98 // Do it independently from master thread
99 udpsend(netmsg, addr);
100
101}
102
103/** sends a NetMsg to the network.
104 *
105 * @param netmsg message to send
106 * @param addr transport endpoint address
107 *
108 * @note both parameters are deleted after the message was sent
109 */
110void
111TPoverUDP::udpsend (NetMsg * netmsg, appladdress * addr)
112{
113#ifndef _NO_LOGGING
114 const char *const thisproc = "sender - ";
115#endif
116
117 // set result initially to success, set it to failure
118 // in places where these failures occur
119 int result = UDP_SUCCESS;
120 int ret = 0;
121
122
123 if (addr)
124 check_send_args (*netmsg, *addr);
125 else
126 {
127 ERRCLog (tpparam.name, thisproc << "address pointer is NULL");
128 result = UDP_SEND_FAILURE;
129 throw TPErrorInternal();
130 }
131
132
133 addr->convert_to_ipv6();
134 in6_addr ip6addr;
135
136 //convert to v4-mapped address if necessary! (we use dual-stack IPv4/IPv6 socket)
137 addr->get_ip(ip6addr);
138
139
140 // *********************************** revised socket code *********************************
141
142
143 // msghdr for sendmsg
144 struct msghdr header;
145
146 // pointer for ancillary data
147 struct cmsghdr *ancillary = NULL;
148
149 // iovec for sendmsg
150 struct iovec iov;
151 iov.iov_base = netmsg->get_buffer();
152 iov.iov_len = netmsg->get_size();
153
154 // destination address
155 struct sockaddr_in6 dest_address;
156 dest_address.sin6_family= AF_INET6;
157 dest_address.sin6_port = htons(addr->get_port());
158 dest_address.sin6_addr = ip6addr;
159 dest_address.sin6_flowinfo = 0;
160 dest_address.sin6_scope_id = 0;
161
162 // fill msghdr
163 header.msg_iov = &iov;
164 header.msg_iovlen = 1;
165 header.msg_name = &dest_address;
166 header.msg_namelen=sizeof(dest_address);
167 header.msg_control=NULL;
168 header.msg_controllen=0;
169
170
171 // pktinfo
172 in6_pktinfo pktinfo;
173
174 //addr->set_if_index(1);
175
176
177 // we have to add up to 2 ancillary data objects (for interface and hop limit)
178
179 uint32 buflength = 0;
180 if (addr->get_if_index()) {
181 buflength = CMSG_SPACE(sizeof(pktinfo));
182 //cout << "PKTINFO data object, total buffer size: " << buflength << "byte" << endl;
183 }
184
185 int hlim = addr->get_ip_ttl();
186
187 if (hlim) {
188 buflength = buflength + CMSG_SPACE(sizeof(int));
189 //cout << "HOPLIMIT data object, total buffer size: " << buflength << "byte" << endl;
190 }
191 // create the buffer
192 if ((addr->get_if_index()) || hlim) {
193 header.msg_control = malloc(buflength);
194 if (header.msg_control == 0)
195 ERRCLog(tpparam.name, thisproc << " malloc failed for ancillary data of size " << buflength);
196 }
197
198 // are we to set the outgoing interface?
199 if (addr->get_if_index()) {
200
201 DLog(tpparam.name, thisproc << " UDP send via Interface " << addr->get_if_index() << " requested.");
202
203 // first cmsghdr at beginning of buffer
204 ancillary = (cmsghdr*) header.msg_control;
205
206 ancillary->cmsg_level=IPPROTO_IPV6;
207 ancillary->cmsg_type=IPV6_PKTINFO;
208 ancillary->cmsg_len=CMSG_LEN(sizeof(pktinfo));
209
210 //cout << "Set up properties of ancillary data object 1" << endl;
211
212 pktinfo.ipi6_addr = in6addr_any;
213 pktinfo.ipi6_ifindex = addr->get_if_index();
214
215 memcpy (CMSG_DATA(ancillary), &pktinfo, sizeof(pktinfo));
216
217 //cout << "Set up data of ancillary data object 1" << endl;
218
219 // update msghdr controllen
220 header.msg_controllen = CMSG_SPACE(sizeof(pktinfo));
221
222 }
223
224 // should we set an explicit Hop Limit?
225 if (hlim) {
226 DLog(tpparam.name, thisproc << " UDP send with IP TTL of " << hlim << " requested.");
227
228 // second cmsghdr after first one
229 cmsghdr* ancillary2 = NULL;
230
231 if (ancillary) {
232 ancillary2 = (cmsghdr*) (ancillary + CMSG_SPACE(sizeof(pktinfo)));
233 } else {
234 ancillary2 = (cmsghdr*) header.msg_control;
235 }
236
237 ancillary2->cmsg_level=IPPROTO_IPV6;
238 ancillary2->cmsg_type=IPV6_HOPLIMIT;
239 ancillary2->cmsg_len = CMSG_LEN(sizeof(int));
240
241 memcpy(CMSG_DATA(ancillary2), &hlim, sizeof(int));
242
243 // update msghdr controllen
244 header.msg_controllen = header.msg_controllen + ancillary2->cmsg_len;
245
246 }
247
248#ifndef _NO_LOGGING
249 uint32 msgsize = netmsg->get_size(); // only used for logging below
250#endif
251
252 // check whether socket is already up and initialized by listener thread
253 // otherwise we may have a race condition, i.e., trying to send before socket is created
254 // FIXME: it may be the case that the socket is already created, but not bound
255 // I'm not sure what happens, when we try to send...
256 while (master_listener_socket == -1)
257 {
258 const unsigned int sleeptime= 1;
259 DLog(tpparam.name, "socket not yet ready for sending - sending deferred (" << sleeptime << " s)");
260 sleep(sleeptime);
261 DLog(tpparam.name, "retrying to send");
262 }
263 // reset IP RAO option
264 ret = setsockopt(master_listener_socket, SOL_IP, IP_OPTIONS, 0, 0);
265 if ( ret != 0 )
266 ERRLog(tpparam.name, "unsetting IP options for IPv4 failed");
267
268 // send UDP packet
269 DLog(tpparam.name, "SEND to " << *addr);
270 ret= sendmsg(master_listener_socket,&header,MSG_DONTWAIT);
271
272 if (ret<0)
273 ERRCLog(tpparam.name, "Socket Send failed! - error (" << errno << "):" << strerror(errno));
274 if (debug_pdu)
275 {
276 ostringstream hexdump;
277 netmsg->hexdump (hexdump);
278 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
279 "PDU debugging enabled - Sent:" << hexdump.str ());
280 }
281
282 if (ret < 0)
283 {
284 result = UDP_SEND_FAILURE;
285 // break;
286 } // end if (ret < 0)
287
288
289 // *** note: netmsg is deleted here ***
290 delete netmsg;
291
292
293 // Throwing an exception within a critical section does not
294 // unlock the mutex.
295
296 if (result != UDP_SUCCESS)
297 {
298 ERRLog(tpparam.name, thisproc << "UDP error, returns " << ret << ", error : " << strerror (errno));
299 delete addr;
300
301 throw TPErrorSendFailed();
302
303 }
304 else
305 Log (EVENT_LOG, LOG_NORMAL, tpparam.name,
306 thisproc << ">>----Sent---->> message (" << msgsize <<
307 " bytes) using socket " << master_listener_socket << " to " << *addr);
308
309 // *** delete address ***
310 delete addr;
311} // end TPoverUDP::udpsend
312
313
314
315/**
316 * IPv4 catcher thread starter:
317 * just a static starter method to allow starting the
318 * actual master_listener_thread() method.
319 *
320 * @param argp - pointer to the current TPoverUDP object instance
321 */
322void *
323TPoverUDP::listener_thread_starter (void *argp)
324{
325 // invoke listener thread method
326 if (argp != 0)
327 {
328 (static_cast < TPoverUDP * >(argp))->listener_thread ();
329 }
330 return 0;
331}
332
333
334
335
336
337/**
338 * UDP master receiver thread: waits for incoming connections at the well-known udp port
339 *
340 */
341void TPoverUDP::listener_thread ()
342{
343 // create a new address-structure for the listening masterthread
344 struct sockaddr_in6 own_address;
345 own_address.sin6_family = AF_INET6;
346 own_address.sin6_flowinfo= 0;
347 own_address.sin6_port = htons(tpparam.port); // use port number in param structure
348 // accept incoming connections on all interfaces
349 own_address.sin6_addr = in6addr_any;
350 own_address.sin6_scope_id= 0;
351
352 // create a listening socket
353 master_listener_socket= socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
354 if (master_listener_socket == -1)
355 {
356 ERRCLog(tpparam.name, "Could not create a new socket, error: " << strerror(errno));
357 return;
358 }
359
360 int socketreuseflag= 1;
361 int status= setsockopt(master_listener_socket,
362 SOL_SOCKET,
363 SO_REUSEADDR,
364 (const char *) &socketreuseflag,
365 sizeof(socketreuseflag));
366 if (status)
367 {
368 ERRCLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
369 }
370
371 // TODO: insert multicast socket options/calls here
372
373 // bind the newly created socket to a specific address
374 int bind_status = bind(master_listener_socket,
375 reinterpret_cast<struct sockaddr *>(&own_address),
376 sizeof(own_address));
377 if (bind_status)
378 {
379 ERRCLog(tpparam.name, "Binding to "
380 << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_loc, INET6_ADDRSTRLEN)
381 << " port " << tpparam.port << " failed, error: " << strerror(errno));
382 return;
383 }
384
385
386 // create a pollfd struct for use in the mainloop
387 struct pollfd poll_fd;
388 poll_fd.fd = master_listener_socket;
389 poll_fd.events = POLLIN | POLLPRI;
390 poll_fd.revents = 0;
391 /*
392 #define POLLIN 0x001 // There is data to read.
393 #define POLLPRI 0x002 // There is urgent data to read.
394 #define POLLOUT 0x004 // Writing now will not block.
395 */
396
397 bool terminate = false;
398 // check for thread terminate condition using get_state()
399 state_t currstate= get_state();
400 int poll_status= 0;
401 const unsigned int number_poll_sockets= 1;
402 struct sockaddr_in6 peer_address;
403 socklen_t peer_address_len;
404 // int conn_socket;
405
406 // check whether this thread is signaled for termination
407 while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
408 {
409
410
411 // wait on number_poll_sockets (main drm socket)
412 // for the events specified above for sleep_time (in ms) tpparam.sleep_time
413 poll_status= poll(&poll_fd, number_poll_sockets, 250);
414 if (poll_fd.revents & POLLERR) // Error condition
415 {
416 if (errno != EINTR)
417 {
418 ERRCLog(tpparam.name, "Poll caused error " << strerror(errno) << " - indicated by revents");
419 }
420 else
421 {
422 EVLog(tpparam.name, "poll(): " << strerror(errno));
423 }
424
425 }
426 if (poll_fd.revents & POLLHUP) // Hung up
427 {
428 ERRCLog(tpparam.name, "Poll hung up");
429 return;
430 }
431 if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
432 {
433 ERRCLog(tpparam.name, "Poll Invalid request: fd not open");
434 return;
435 }
436
437 switch (poll_status)
438 {
439 case -1:
440 if (errno != EINTR)
441 {
442 ERRCLog(tpparam.name, "Poll status indicates error: " << strerror(errno));
443 }
444 else
445 {
446 EVLog(tpparam.name, "Poll status: " << strerror(errno));
447 }
448
449 break;
450
451 case 0:
452#ifdef DEBUG_HARD
453 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
454 "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
455#endif
456 currstate= get_state();
457 continue;
458 break;
459
460 default:
461#ifdef DEBUG_HARD
462 Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
463#endif
464 break;
465 } // end switch
466
467
468
469 //if there is data to read, do it
470
471 if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
472
473
474 // in peer_address and the size of its address in addrlen
475 peer_address_len= sizeof(peer_address);
476
477 //Build us a NetMsg
478 NetMsg *netmsg=0;
479 netmsg = new NetMsg (NetMsg::max_size);
480
481
482
483 /// receive data from socket buffer (recv will not block)
484 int ret = recvfrom (master_listener_socket,
485 netmsg->get_buffer (), NetMsg::max_size, 0, reinterpret_cast<struct sockaddr *>(&peer_address),
486 &peer_address_len);
487
488 if (ret)
489 {
490 DLog(tpparam.name, "Yankeedoo, we received " << ret << " bytes of DATA!!");
491
492 // truncate netmsg buffer
493 netmsg->truncate(ret);
494 }
495
496 /**************************************************************
497 * The following restrictions should apply: *
498 * *
499 * This is UDP, messages are contained in ONE datagram *
500 * datagrams CANNOT fragment, as otherwise TCP is used *
501 * so we now build a TPMsg, send it to signaling and *
502 * all should be well. At least until now. *
503 **************************************************************/
504
505 // Build peer_adr and own_addr
506 appladdress* peer_addr = new appladdress;
507 peer_addr->set_ip(peer_address.sin6_addr);
508 peer_addr->set_port(peer_address.sin6_port);
509 appladdress* own_addr = new appladdress();
510
511 // Log the sender peer and write to peer_addr
512 char source_addr[INET6_ADDRSTRLEN+1];
513 inet_ntop(AF_INET6, &peer_address.sin6_addr, source_addr, INET6_ADDRSTRLEN);
514
515
516 peer_addr->set_port(htons(peer_address.sin6_port));
517 peer_addr->set_ip(peer_address.sin6_addr);
518 peer_addr->set_protocol(get_underlying_protocol());
519
520 DLog(tpparam.name, "Peer: [" << *peer_addr << "]");
521
522 // create TPMsg and send it to the signaling thread
523 //fprintf (stderr, "Before TPMsg creation\n");
524 TPMsg *tpmsg=
525 new (nothrow) TPMsg (netmsg, peer_addr, own_addr);
526
527 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
528 "recvthread - receipt of GIST PDU now complete, sending msg#" << tpmsg->get_id() << " to signaling module");
529
530
531 if (tpmsg == NULL || !tpmsg->send(tpparam.source, tpparam.dest))
532 {
533 ERRLog(tpparam.name, "rcvthread" << "Cannot allocate/send TPMsg");
534 if (tpmsg)
535 delete tpmsg;
536 if (netmsg)
537 delete netmsg;
538
539 }
540
541 }
542
543 // get new thread state
544 currstate= get_state();
545
546 } // end while(!terminate)
547
548 return;
549
550}
551
552
553TPoverUDP::~TPoverUDP ()
554{
555 init = false;
556
557 Log (DEBUG_LOG, LOG_NORMAL, tpparam.name, "Destructor called");
558
559}
560
561/** TPoverUDP Thread main loop.
562 * This loop checks for internal messages of either
563 * a send() call to start a new receiver thread, or,
564 * of a receiver_thread() that signals its own termination
565 * for proper cleanup of control structures.
566 *
567 * @param nr number of current thread instance
568 */
569void
570TPoverUDP::main_loop (uint32 nr)
571{
572
573 int pthread_status = 0;
574
575
576 // start UDP listener thread
577 pthread_t listener_thread_ID;
578 pthread_status = pthread_create (&listener_thread_ID, NULL, //NULL: default attributes
579 listener_thread_starter, this);
580 if (pthread_status)
581 {
582 ERRCLog(tpparam.name,
583 "UDP listening thread could not be created: " <<
584 strerror (pthread_status));
585 }
586 else
587
588 Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
589
590
591
592 // define max latency for thread reaction on termination/stop signal
593 state_t currstate = get_state ();
594
595 // check whether this thread is signaled for termination
596 while (currstate != STATE_ABORT && currstate != STATE_STOP)
597 {
598
599 // get thread state
600 currstate = get_state ();
601
602 sleep(4);
603
604 } // end while
605
606 if (currstate == STATE_STOP)
607 {
608 // start abort actions
609 Log (INFO_LOG, LOG_NORMAL, tpparam.name,
610 "Asked to abort, stopping all receiver threads");
611 } // end if stopped
612
613 // do not accept any more messages
614 // terminate all receiver and sender threads that are still active
615 //terminate_all_threads ();
616}
617
618
619void
620TPoverUDP::terminate(const address& addr)
621{
622 // no connection oriented protocol, nothing to terminate
623}
624
625} // end namespace protlib
626
627///@}
Note: See TracBrowser for help on using the repository browser.