An Overlay-based
Virtual Network Substrate
SpoVNet

source: trash/old-modules/transport/protlib/tp_over_udp.cpp @ 5641

Last change on this file since 5641 was 5641, checked in by Christoph Mayer, 14 years ago
File size: 16.8 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_udp.cpp
3/// UDP-based transport module
4/// ----------------------------------------------------------
5/// $Id: tp_over_udp.cpp 2872 2008-02-18 10:58:03Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/src/tp_over_udp.cpp $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29
30extern "C"
31{
32  //#define _SOCKADDR_LEN    /* use BSD 4.4 sockets */
33#include <unistd.h>             /* gethostname */
34#include <sys/types.h>          /* network socket interface */
35#include <netinet/ip.h>         /* iphdr */
36#include <netinet/ip6.h>        /* ip6_hdr */
37#include <netinet/in.h>         /* network socket interface */
38#include <netinet/tcp.h>        /* for TCP Socket Option */
39#include <netinet/udp.h>        /* for UDP header */
40#include <sys/socket.h>
41#include <arpa/inet.h>          /* inet_addr */
42
43#include <fcntl.h>
44#include <sys/poll.h>
45}
46
47#include <iostream>
48#include <errno.h>
49#include <string>
50#include <sstream>
51
52#include "tp_over_udp.h"
53#include "threadsafe_db.h"
54#include "cleanuphandler.h"
55#include "setuid.h"
56#include "logfile.h"
57#include "linux/netfilter.h"
58
59#include <set>
60
61#define UDP_SUCCESS 0
62#define UDP_SEND_FAILURE 1
63
64#define BUFSIZE 2048000
65
66
67const unsigned int max_listen_queue_size = 10;
68
69namespace protlib
70{
71
72  using namespace log;
73
74/** @defgroup tpudp TP over UDP
75 * @ingroup network
76 * @{
77 */
78
79char in6_addrstr_loc[INET6_ADDRSTRLEN+1];
80
81/******* class TPoverUDP *******/
82
83
84/** generates an internal TPoverUDP message to send a NetMsg to the network
85 *
86 *  - the sending thread will call TPoverUDP::udpsend()
87 *  - since UDP is connectionless we can safely ignore the use_existing_connection attribute
88 *  @note the netmsg is deleted by the send() method when it is not used anymore
89 */
90void TPoverUDP::send (NetMsg * netmsg, const address & in_addr, bool use_existing_connection)
91{
92
93  appladdress* addr = NULL;
94  addr= dynamic_cast<appladdress*>(in_addr.copy());
95
96  if (!addr) return;
97
98  // Do it independently from master thread
99  udpsend(netmsg, addr);
100
101}
102
103/** sends a NetMsg to the network.
104 *
105 * @param netmsg   message to send
106 * @param addr     transport endpoint address
107 *
108 * @note           both parameters are deleted after the message was sent
109 */
110void
111TPoverUDP::udpsend (NetMsg * netmsg, appladdress * addr)
112{
113#ifndef _NO_LOGGING
114  const char *const thisproc = "sender   - ";
115#endif
116
117  // set result initially to success, set it to failure
118  // in places where these failures occur
119  int result = UDP_SUCCESS;
120  int ret = 0;
121
122
123  if (addr)
124    check_send_args (*netmsg, *addr);
125  else
126    {
127      ERRCLog (tpparam.name, thisproc << "address pointer is NULL");
128      result = UDP_SEND_FAILURE;
129      throw TPErrorInternal();
130    }
131
132
133  addr->convert_to_ipv6();
134  in6_addr ip6addr;
135
136  //convert to v4-mapped address if necessary! (we use dual-stack IPv4/IPv6 socket)
137  addr->get_ip(ip6addr);
138
139
140  // *********************************** revised socket code *********************************
141
142
143  // msghdr for sendmsg
144  struct msghdr header;
145
146  // pointer for ancillary data
147  struct cmsghdr *ancillary = NULL;
148
149  // iovec for sendmsg
150  struct iovec iov;
151  iov.iov_base = netmsg->get_buffer();
152  iov.iov_len = netmsg->get_size();
153
154  // destination address
155  struct sockaddr_in6 dest_address;
156  dest_address.sin6_family= AF_INET6;
157  dest_address.sin6_port  = htons(addr->get_port());
158  dest_address.sin6_addr  = ip6addr;
159  dest_address.sin6_flowinfo = 0;
160  dest_address.sin6_scope_id = 0;
161
162  // fill msghdr
163  header.msg_iov = &iov;
164  header.msg_iovlen = 1;
165  header.msg_name = &dest_address;
166  header.msg_namelen=sizeof(dest_address);
167  header.msg_control=NULL;
168  header.msg_controllen=0;
169
170
171  // pktinfo
172  in6_pktinfo pktinfo;
173
174  //addr->set_if_index(1);
175
176
177  // we have to add up to 2 ancillary data objects (for interface and hop limit)
178
179  uint32 buflength = 0;
180  if (addr->get_if_index()) {
181    buflength = CMSG_SPACE(sizeof(pktinfo));
182    //cout << "PKTINFO data object, total buffer size: " << buflength << "byte" << endl;
183  }
184
185  int hlim = addr->get_ip_ttl();
186
187  if (hlim) {
188    buflength = buflength + CMSG_SPACE(sizeof(int));
189    //cout << "HOPLIMIT data object, total buffer size: " << buflength << "byte" << endl;
190  }
191  // create the buffer
192  if ((addr->get_if_index()) || hlim) {
193    header.msg_control = malloc(buflength);
194    if (header.msg_control == 0)
195      ERRCLog(tpparam.name, thisproc << " malloc failed for ancillary data of size " << buflength);
196  }
197
198  // are we to set the outgoing interface?
199  if (addr->get_if_index()) {
200
201    DLog(tpparam.name, thisproc << " UDP send via Interface " << addr->get_if_index() << " requested.");
202
203    // first cmsghdr at beginning of buffer
204    ancillary = (cmsghdr*) header.msg_control;
205
206    ancillary->cmsg_level=IPPROTO_IPV6;
207    ancillary->cmsg_type=IPV6_PKTINFO;
208    ancillary->cmsg_len=CMSG_LEN(sizeof(pktinfo));
209
210    //cout << "Set up properties of ancillary data object 1" << endl;
211
212    pktinfo.ipi6_addr = in6addr_any;
213    pktinfo.ipi6_ifindex = addr->get_if_index();
214
215    memcpy (CMSG_DATA(ancillary), &pktinfo, sizeof(pktinfo));
216
217    //cout << "Set up data of ancillary data object 1" << endl;
218
219    // update msghdr controllen
220    header.msg_controllen = CMSG_SPACE(sizeof(pktinfo));
221
222  }
223
224  // should we set an explicit Hop Limit?
225  if (hlim) {
226    DLog(tpparam.name, thisproc << " UDP send with IP TTL of " << hlim << " requested.");
227
228    // second cmsghdr after first one
229    cmsghdr* ancillary2 = NULL;
230
231    if (ancillary) {
232      ancillary2 = (cmsghdr*) (ancillary + CMSG_SPACE(sizeof(pktinfo)));
233    } else {
234      ancillary2 = (cmsghdr*) header.msg_control;
235    }
236
237    ancillary2->cmsg_level=IPPROTO_IPV6;
238    ancillary2->cmsg_type=IPV6_HOPLIMIT;
239    ancillary2->cmsg_len = CMSG_LEN(sizeof(int));
240
241    memcpy(CMSG_DATA(ancillary2), &hlim, sizeof(int));
242
243    // update msghdr controllen
244    header.msg_controllen = header.msg_controllen + ancillary2->cmsg_len;
245
246  }
247
248#ifndef _NO_LOGGING
249  uint32 msgsize = netmsg->get_size();  // only used for logging below
250#endif
251
252  // check whether socket is already up and initialized by listener thread
253  // otherwise we may have a race condition, i.e., trying to send before socket is created
254  // FIXME: it may be the case that the socket is already created, but not bound
255  //        I'm not sure what happens, when we try to send...
256  while (master_listener_socket == -1)
257  {
258    const unsigned int sleeptime= 1;
259    DLog(tpparam.name, "socket not yet ready for sending - sending deferred (" << sleeptime << " s)");
260    sleep(sleeptime);
261    DLog(tpparam.name, "retrying to send");
262  }
263  // reset IP RAO option
264  ret = setsockopt(master_listener_socket, SOL_IP, IP_OPTIONS, 0, 0);
265  if ( ret != 0 )
266    ERRLog(tpparam.name, "unsetting IP options for IPv4 failed");
267
268  // send UDP packet
269  DLog(tpparam.name, "SEND to " << *addr);
270  ret= sendmsg(master_listener_socket,&header,MSG_DONTWAIT);
271
272  if (ret<0)
273    ERRCLog(tpparam.name, "Socket Send failed! - error (" << errno << "):" << strerror(errno));
274  if (debug_pdu)
275    {
276      ostringstream hexdump;
277      netmsg->hexdump (hexdump);
278      Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
279           "PDU debugging enabled - Sent:" << hexdump.str ());
280    }
281
282  if (ret < 0)
283    {
284      result = UDP_SEND_FAILURE;
285      //    break;
286    } // end if (ret < 0)
287
288
289      // *** note: netmsg is deleted here ***
290  delete netmsg;
291
292
293  // Throwing an exception within a critical section does not
294  // unlock the mutex.
295
296  if (result != UDP_SUCCESS)
297    {
298      ERRLog(tpparam.name, thisproc << "UDP error, returns " << ret << ", error : " << strerror (errno));
299      delete addr;
300
301      throw TPErrorSendFailed();
302
303    }
304  else
305    Log (EVENT_LOG, LOG_NORMAL, tpparam.name,
306         thisproc << ">>----Sent---->> message (" << msgsize <<
307         " bytes) using socket " << master_listener_socket << " to " << *addr);
308
309  // *** delete address ***
310  delete addr;
311} // end TPoverUDP::udpsend
312
313
314
315/**
316 * IPv4 catcher thread starter:
317 * just a static starter method to allow starting the
318 * actual master_listener_thread() method.
319 *
320 * @param argp - pointer to the current TPoverUDP object instance
321 */
322void *
323TPoverUDP::listener_thread_starter (void *argp)
324{
325  // invoke listener thread method
326  if (argp != 0)
327    {
328        (static_cast < TPoverUDP * >(argp))->listener_thread ();
329    }
330  return 0;
331}
332
333
334
335
336
337/**
338 * UDP master receiver thread: waits for incoming connections at the well-known udp port
339 *
340 */
341void TPoverUDP::listener_thread ()
342{
343  // create a new address-structure for the listening masterthread
344  struct sockaddr_in6 own_address;
345  own_address.sin6_family = AF_INET6;
346  own_address.sin6_flowinfo= 0;
347  own_address.sin6_port = htons(tpparam.port); // use port number in param structure
348  // accept incoming connections on all interfaces
349  own_address.sin6_addr = in6addr_any;
350  own_address.sin6_scope_id= 0;
351
352  // create a listening socket
353  master_listener_socket= socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
354  if (master_listener_socket == -1)
355    {
356      ERRCLog(tpparam.name, "Could not create a new socket, error: " << strerror(errno));
357      return;
358    }
359
360  int socketreuseflag= 1;
361  int status= setsockopt(master_listener_socket,
362                         SOL_SOCKET,
363                         SO_REUSEADDR,
364                         (const char *) &socketreuseflag,
365                         sizeof(socketreuseflag));
366  if (status)
367  {
368    ERRCLog(tpparam.name, "Could not set socket option SO_REUSEADDR:" << strerror(errno));
369  }
370
371  // TODO: insert multicast socket options/calls here
372
373  // bind the newly created socket to a specific address
374  int bind_status = bind(master_listener_socket,
375                         reinterpret_cast<struct sockaddr *>(&own_address),
376                         sizeof(own_address));
377  if (bind_status)
378    {
379      ERRCLog(tpparam.name, "Binding to "
380          << inet_ntop(AF_INET6, &own_address.sin6_addr, in6_addrstr_loc, INET6_ADDRSTRLEN)
381          << " port " << tpparam.port << " failed, error: " << strerror(errno));
382      return;
383    }
384
385
386  // create a pollfd struct for use in the mainloop
387  struct pollfd poll_fd;
388  poll_fd.fd = master_listener_socket;
389  poll_fd.events = POLLIN | POLLPRI;
390  poll_fd.revents = 0;
391  /*
392    #define POLLIN      0x001   // There is data to read.
393    #define POLLPRI     0x002   // There is urgent data to read.
394    #define POLLOUT     0x004   // Writing now will not block.
395  */
396
397  bool terminate = false;
398  // check for thread terminate condition using get_state()
399  state_t currstate= get_state();
400  int poll_status= 0;
401  const unsigned int number_poll_sockets= 1;
402  struct sockaddr_in6 peer_address;
403  socklen_t peer_address_len;
404  // int conn_socket;
405
406  // check whether this thread is signaled for termination
407  while(! (terminate= (currstate==STATE_ABORT || currstate==STATE_STOP) ) )
408    {
409
410
411      // wait on number_poll_sockets (main drm socket)
412      // for the events specified above for sleep_time (in ms) tpparam.sleep_time
413      poll_status= poll(&poll_fd, number_poll_sockets, 250);
414      if (poll_fd.revents & POLLERR) // Error condition
415        {
416          if (errno != EINTR)
417            {
418              ERRCLog(tpparam.name, "Poll caused error " << strerror(errno) << " - indicated by revents");
419            }
420          else
421            {
422              EVLog(tpparam.name, "poll(): " << strerror(errno));
423            }
424
425        }
426      if (poll_fd.revents & POLLHUP) // Hung up
427        {
428          ERRCLog(tpparam.name, "Poll hung up");
429          return;
430        }
431      if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
432        {
433          ERRCLog(tpparam.name, "Poll Invalid request: fd not open");
434          return;
435        }
436
437      switch (poll_status)
438        {
439        case -1:
440          if (errno != EINTR)
441            {
442              ERRCLog(tpparam.name, "Poll status indicates error: " << strerror(errno));
443            }
444          else
445            {
446              EVLog(tpparam.name, "Poll status: " << strerror(errno));
447            }
448
449          break;
450
451        case 0:
452#ifdef DEBUG_HARD
453          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name,
454              "Listen Thread - Poll timed out after " << tpparam.sleep_time << " ms.");
455#endif
456          currstate= get_state();
457          continue;
458          break;
459
460        default:
461#ifdef DEBUG_HARD
462          Log(DEBUG_LOG,LOG_UNIMP, tpparam.name, "Poll: " << poll_status << " event(s) ocurred, of type " << poll_fd.revents);
463#endif
464          break;
465        } // end switch
466
467
468
469      //if there is data to read, do it
470
471      if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
472
473
474        // in peer_address and the size of its address in addrlen
475        peer_address_len= sizeof(peer_address);
476
477        //Build us a NetMsg
478        NetMsg *netmsg=0;
479        netmsg = new NetMsg (NetMsg::max_size);
480
481
482
483        /// receive data from socket buffer (recv will not block)
484        int ret = recvfrom (master_listener_socket,
485                            netmsg->get_buffer (), NetMsg::max_size, 0, reinterpret_cast<struct sockaddr *>(&peer_address),
486                            &peer_address_len);
487
488        if (ret)
489        {
490          DLog(tpparam.name, "Yankeedoo, we received " << ret << " bytes of DATA!!");
491
492          // truncate netmsg buffer
493          netmsg->truncate(ret);
494        }
495
496        /**************************************************************
497         *  The following restrictions should apply:                  *
498         *                                                            *
499         *  This is UDP, messages are contained in ONE datagram       *
500         *  datagrams CANNOT fragment, as otherwise TCP is used       *
501         *  so we now build a TPMsg, send it to signaling and         *
502         *  all should be well. At least until now.                   *
503         **************************************************************/
504
505        // Build peer_adr and own_addr
506        appladdress* peer_addr = new appladdress;
507        peer_addr->set_ip(peer_address.sin6_addr);
508        peer_addr->set_port(peer_address.sin6_port);
509        appladdress* own_addr = new appladdress();
510
511        // Log the sender peer and write to peer_addr
512        char source_addr[INET6_ADDRSTRLEN+1];
513        inet_ntop(AF_INET6, &peer_address.sin6_addr, source_addr, INET6_ADDRSTRLEN);
514
515
516        peer_addr->set_port(htons(peer_address.sin6_port));
517        peer_addr->set_ip(peer_address.sin6_addr);
518        peer_addr->set_protocol(get_underlying_protocol());
519
520        DLog(tpparam.name, "Peer: [" << *peer_addr << "]");
521
522        // create TPMsg and send it to the signaling thread
523        //fprintf (stderr, "Before TPMsg creation\n");
524        TPMsg *tpmsg=
525          new (nothrow) TPMsg (netmsg, peer_addr, own_addr);
526
527        Log (DEBUG_LOG, LOG_NORMAL, tpparam.name,
528             "recvthread - receipt of GIST PDU now complete, sending msg#" << tpmsg->get_id() << " to signaling module");
529
530
531        if (tpmsg == NULL || !tpmsg->send(tpparam.source, tpparam.dest))
532          {
533            ERRLog(tpparam.name, "rcvthread" << "Cannot allocate/send TPMsg");
534            if (tpmsg)
535              delete tpmsg;
536            if (netmsg)
537              delete netmsg;
538
539          }
540
541      }
542
543      // get new thread state
544      currstate= get_state();
545
546    } // end while(!terminate)
547
548  return;
549
550}
551
552
553TPoverUDP::~TPoverUDP ()
554{
555  init = false;
556
557  Log (DEBUG_LOG, LOG_NORMAL, tpparam.name, "Destructor called");
558
559}
560
561/** TPoverUDP Thread main loop.
562 * This loop checks for internal messages of either
563 * a send() call to start a new receiver thread, or,
564 * of a receiver_thread() that signals its own termination
565 * for proper cleanup of control structures.
566 *
567 * @param nr number of current thread instance
568 */
569void
570TPoverUDP::main_loop (uint32 nr)
571{
572
573  int pthread_status = 0;
574
575
576  // start UDP listener thread
577  pthread_t listener_thread_ID;
578  pthread_status = pthread_create (&listener_thread_ID, NULL,   //NULL: default attributes
579                                   listener_thread_starter, this);
580  if (pthread_status)
581    {
582      ERRCLog(tpparam.name,
583           "UDP listening thread could not be created: " <<
584           strerror (pthread_status));
585    }
586  else
587
588    Log(INFO_LOG,LOG_NORMAL, tpparam.name, color[green] << "Listening at port #" << tpparam.port << color[off]);
589
590
591
592  // define max latency for thread reaction on termination/stop signal
593  state_t currstate = get_state ();
594
595  // check whether this thread is signaled for termination
596  while (currstate != STATE_ABORT && currstate != STATE_STOP)
597    {
598
599      // get thread state
600      currstate = get_state ();
601
602      sleep(4);
603
604    }                           // end while
605
606  if (currstate == STATE_STOP)
607    {
608      // start abort actions
609      Log (INFO_LOG, LOG_NORMAL, tpparam.name,
610           "Asked to abort, stopping all receiver threads");
611    }                           // end if stopped
612
613  // do not accept any more messages
614  // terminate all receiver and sender threads that are still active
615  //terminate_all_threads ();
616}
617
618
619void
620TPoverUDP::terminate(const address& addr)
621{
622        // no connection oriented protocol, nothing to terminate
623}
624
625}                               // end namespace protlib
626
627///@}
Note: See TracBrowser for help on using the repository browser.