An Overlay-based
Virtual Network Substrate
SpoVNet

source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.h @ 6922

Last change on this file since 6922 was 6922, checked in by mies, 10 years ago

replaced deprecated hash_map, hash_set classes with boost
added unsigned serialization
fixed more warnings
included -Wall to Makefile.am

File size: 8.1 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.h
3/// Transport over TCP
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.h 2718 2007-07-24 03:23:14Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/include/tp_over_tcp.h $
7// ===========================================================
8//                     
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//                     
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29/** @ingroup tptcp
30 * @ file
31 * TP over TCP
32 */
33
34#ifndef TP_OVER_TCP_H
35#define TP_OVER_TCP_H
36
37#include <boost/unordered_map.hpp>
38
39#include "tp.h"
40#include "threads.h"
41#include "threadsafe_db.h"
42#include "connectionmap.h"
43#include "assocdata.h"
44
45namespace protlib
46{
47/** this struct conatains parameters that determine
48  * the behavior of listener and receiver threads in TPoverTCP
49  * @param port - port number for master listener thread (server port)
50  * @param sleep - time (in ms) that listener and receiver wait at a poll() call
51  * @param d - destination module, where internal message are sent
52  */
53struct TPoverTCPParam : public ThreadParam
54{
55  /// constructor
56    TPoverTCPParam(unsigned short common_header_length,
57                 bool (*const getmsglength) (NetMsg& m, uint32& clen_bytes),
58                 port_t p,
59                 const char* threadname= "TPoverTCP",
60                 uint32 sleep = ThreadParam::default_sleep_time,
61                 bool debug_pdu = false,
62                 message::qaddr_t source = message::qaddr_transport,
63                 message::qaddr_t dest = message::qaddr_signaling,
64                 bool sendaborts = false,
65                 uint8 tos = 0x10) :
66    ThreadParam(sleep,threadname,1,1),
67       port(p),
68       debug_pdu(debug_pdu),
69       source(source),
70       dest(dest),
71       common_header_length(common_header_length),
72       getmsglength(getmsglength),
73       terminate(false),
74       ip_tos(tos)
75        {};
76
77
78  /// port to bind master listener thread to
79  const port_t port;
80  bool debug_pdu;
81  /// message source
82  const message::qaddr_t source;
83  const message::qaddr_t dest;
84  /// what is the length of the common header
85  const unsigned short common_header_length;
86
87  /// function pointer to a function that figures out the msg length in number of 4 byte words
88  /// it returns false if error occured (e.g., malformed header), result is returned in variable clen_words
89  bool (*const getmsglength) (NetMsg& m, uint32& clen_words);
90
91  /// should master thread terminate?
92  const bool terminate;
93  const uint8 ip_tos;
94}; // end TPoverUDPParam
95
96
97/// TP over TCP
98/** This class implements the TP interface using TCP. */
99class TPoverTCP : public TP, public Thread
100{
101/***** inherited from TP *****/
102public:
103  /// sends a network message, spawns receiver thread if necessary
104  virtual void send(NetMsg* msg,const address& addr, bool use_existing_connection);
105  virtual void terminate(const address& addr);
106 
107  /***** inherited from Thread *****/
108public:
109  /// main loop
110  virtual void main_loop(uint32 nr);
111 
112/***** other members *****/
113public:
114  /// constructor
115  TPoverTCP(const TPoverTCPParam& p) :
116    TP(tsdb::get_tcp_id(),"tcp",p.name,p.common_header_length,p.getmsglength),
117    Thread(p), tpparam(p), already_aborted(false), msgqueue(NULL), debug_pdu(p.debug_pdu)
118  { 
119    // perform some initializing actions
120    // currently not required (SCTP had to init its library)
121    init= true; ///< init done;
122  }
123  /// virtual destructor
124  virtual ~TPoverTCP();
125 
126  typedef
127  struct receiver_thread_arg
128  {
129    const AssocData* peer_assoc;
130    bool sig_terminate;
131    bool terminated;
132  public:
133    receiver_thread_arg(const AssocData* peer_assoc) : 
134      peer_assoc(peer_assoc), sig_terminate(false), terminated(true) {};
135  } receiver_thread_arg_t;
136 
137  class receiver_thread_start_arg_t
138  {
139  public:
140    TPoverTCP* instance;
141    receiver_thread_arg_t* rtargp;
142   
143    receiver_thread_start_arg_t(TPoverTCP* instance, receiver_thread_arg_t* rtargp) :
144      instance(instance), rtargp(rtargp) {};
145  };
146
147  class sender_thread_start_arg_t
148  {
149  public:
150    TPoverTCP* instance;
151    FastQueue* sender_thread_queue;
152   
153    sender_thread_start_arg_t(TPoverTCP* instance, FastQueue* sq) :
154      instance(instance), sender_thread_queue(sq) {};
155  };
156 
157private:
158  /// returns already existing connection or establishes a new one
159  AssocData* get_connection_to(const appladdress& addr);
160
161  /// receiver thread for a specific socket
162  void sender_thread(void *argp);
163 
164  /// receiver thread for a specific socket
165  void receiver_thread(void *argp);
166
167  /// send a message to the network via TCP
168  void tcpsend(NetMsg* msg, appladdress* addr);
169 
170  /// sender thread starter for a specific socket
171  static void* sender_thread_starter(void *argp);
172
173  /// receiver thread starter for a specific socket
174  static void* receiver_thread_starter(void *argp);
175 
176  /// a static starter method to invoke the actual main listener
177  static void* master_listener_thread_starter(void *argp);
178 
179  /// main listener thread procedure
180  void master_listener_thread();
181 
182  // create and start new sender thread
183  void create_new_sender_thread(FastQueue* senderqueue);
184
185  // create and start new receiver thread
186  void create_new_receiver_thread(AssocData* peer_assoc);
187 
188  /// terminates particular thread
189  void stop_receiver_thread(AssocData* peer_assoc);
190
191  /// cleans up thread management structures
192  void cleanup_receiver_thread(AssocData* peer_assoc);
193
194  /// terminates a sender thread
195  void terminate_sender_thread(const AssocData* assoc);
196 
197  /// terminates all active receiver or sender threads
198  void terminate_all_threads();
199 
200  /// ConnectionMap instance for keeping track of all existing connections
201  ConnectionMap connmap;
202 
203  /// store per receiver thread arguments, e.g. for signaling termination
204  typedef boost::unordered_map<pthread_t, receiver_thread_arg_t*> recv_thread_argmap_t;
205  recv_thread_argmap_t  recv_thread_argmap;
206
207  /// store sender thread related information
208  typedef boost::unordered_map<appladdress, FastQueue*> sender_thread_queuemap_t;
209  sender_thread_queuemap_t  senderthread_queuemap;
210 
211  /// parameters for main TPoverTCP thread
212  const TPoverTCPParam tpparam;
213 
214  /// did we already abort at thread shutdown
215  bool already_aborted;
216  /// message queue
217  FastQueue* msgqueue;
218 
219  bool debug_pdu;
220}; // end class TPoverTCP
221
222/** A simple internal message for selfmessages
223 * please note that carried items may get deleted after use of this message
224 * the message destructor does not delete any item automatically
225 */
226class TPoverTCPMsg : public message
227{
228 public:
229  // message type start/stop thread, send data
230  enum msg_t { start, 
231               stop,
232               send_data
233  };
234
235 private:
236  const AssocData* peer_assoc;
237  const TPoverTCPMsg::msg_t type;
238  NetMsg* netmsg;
239  appladdress* addr;
240
241public:
242  TPoverTCPMsg(const AssocData* peer_assoc, message::qaddr_t source= qaddr_unknown, TPoverTCPMsg::msg_t type= stop) : 
243    message(type_transport, source), peer_assoc(peer_assoc), type(type), netmsg(0), addr(0)  {}
244
245  TPoverTCPMsg(NetMsg* netmsg, appladdress* addr, message::qaddr_t source= qaddr_unknown) : 
246    message(type_transport, source), peer_assoc(0), type(send_data), netmsg(netmsg), addr(addr) {}
247
248  const AssocData* get_peer_assoc() const { return peer_assoc; }
249  TPoverTCPMsg::msg_t get_msgtype() const { return type; }
250  NetMsg* get_netmsg() const { return netmsg; }
251  appladdress* get_appladdr() const { return addr; } 
252};
253
254} // end namespace protlib
255
256#endif
Note: See TracBrowser for help on using the repository browser.