source: source/ariba/utility/transport/tcpip/protlib/tp_over_tcp.h@ 8606

Last change on this file since 8606 was 8606, checked in by Christoph Mayer, 14 years ago

-memleaks

File size: 8.1 KB
Line 
1/// ----------------------------------------*- mode: C++; -*--
2/// @file tp_over_tcp.h
3/// Transport over TCP
4/// ----------------------------------------------------------
5/// $Id: tp_over_tcp.h 2718 2007-07-24 03:23:14Z bless $
6/// $HeadURL: https://svn.ipv6.tm.uka.de/nsis/protlib/trunk/include/tp_over_tcp.h $
7// ===========================================================
8//
9// Copyright (C) 2005-2007, all rights reserved by
10// - Institute of Telematics, Universitaet Karlsruhe (TH)
11//
12// More information and contact:
13// https://projekte.tm.uka.de/trac/NSIS
14//
15// This program is free software; you can redistribute it and/or modify
16// it under the terms of the GNU General Public License as published by
17// the Free Software Foundation; version 2 of the License
18//
19// This program is distributed in the hope that it will be useful,
20// but WITHOUT ANY WARRANTY; without even the implied warranty of
21// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22// GNU General Public License for more details.
23//
24// You should have received a copy of the GNU General Public License along
25// with this program; if not, write to the Free Software Foundation, Inc.,
26// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27//
28// ===========================================================
29/** @ingroup tptcp
30 * @ file
31 * TP over TCP
32 */
33
34#ifndef TP_OVER_TCP_H
35#define TP_OVER_TCP_H
36
37#include <boost/unordered_map.hpp>
38
39#include "tp.h"
40#include "threads.h"
41#include "threadsafe_db.h"
42#include "connectionmap.h"
43#include "assocdata.h"
44
45namespace protlib
46{
47/** this struct conatains parameters that determine
48 * the behavior of listener and receiver threads in TPoverTCP
49 * @param port - port number for master listener thread (server port)
50 * @param sleep - time (in ms) that listener and receiver wait at a poll() call
51 * @param d - destination module, where internal message are sent
52 */
53struct TPoverTCPParam : public ThreadParam
54{
55 /// constructor
56 TPoverTCPParam(unsigned short common_header_length,
57 bool (*const getmsglength) (NetMsg& m, uint32& clen_bytes),
58 port_t p,
59 const char* threadname= "TPoverTCP",
60 uint32 sleep = ThreadParam::default_sleep_time,
61 bool debug_pdu = false,
62 message::qaddr_t source = message::qaddr_transport,
63 message::qaddr_t dest = message::qaddr_signaling,
64 bool sendaborts = false,
65 uint8 tos = 0x10) :
66 ThreadParam(sleep,threadname,1,1),
67 port(p),
68 debug_pdu(debug_pdu),
69 source(source),
70 dest(dest),
71 common_header_length(common_header_length),
72 getmsglength(getmsglength),
73 terminate(false),
74 ip_tos(tos)
75 {};
76
77
78 /// port to bind master listener thread to
79 const port_t port;
80 bool debug_pdu;
81 /// message source
82 const message::qaddr_t source;
83 const message::qaddr_t dest;
84 /// what is the length of the common header
85 const unsigned short common_header_length;
86
87 /// function pointer to a function that figures out the msg length in number of 4 byte words
88 /// it returns false if error occured (e.g., malformed header), result is returned in variable clen_words
89 bool (*const getmsglength) (NetMsg& m, uint32& clen_words);
90
91 /// should master thread terminate?
92 const bool terminate;
93 const uint8 ip_tos;
94}; // end TPoverUDPParam
95
96
97/// TP over TCP
98/** This class implements the TP interface using TCP. */
99class TPoverTCP : public TP, public Thread
100{
101/***** inherited from TP *****/
102public:
103 /// sends a network message, spawns receiver thread if necessary
104 virtual void send(NetMsg* msg,const address& addr, bool use_existing_connection);
105 virtual void terminate(const address& addr);
106
107 /***** inherited from Thread *****/
108public:
109 /// main loop
110 virtual void main_loop(uint32 nr);
111
112/***** other members *****/
113public:
114 /// constructor
115 TPoverTCP(const TPoverTCPParam& p) :
116 TP(tsdb::get_tcp_id(),"tcp",p.name,p.common_header_length,p.getmsglength),
117 Thread(p), tpparam(p), already_aborted(false), msgqueue(NULL), debug_pdu(p.debug_pdu)
118 {
119 // perform some initializing actions
120 // currently not required (SCTP had to init its library)
121 init= true; ///< init done;
122 }
123 /// virtual destructor
124 virtual ~TPoverTCP();
125
126 typedef
127 struct receiver_thread_arg
128 {
129 const AssocData* peer_assoc;
130 bool sig_terminate;
131 bool terminated;
132 public:
133 receiver_thread_arg(const AssocData* peer_assoc) :
134 peer_assoc(peer_assoc), sig_terminate(false), terminated(true) {};
135 } receiver_thread_arg_t;
136
137 class receiver_thread_start_arg_t
138 {
139 public:
140 TPoverTCP* instance;
141 receiver_thread_arg_t* rtargp;
142
143 receiver_thread_start_arg_t(TPoverTCP* instance, receiver_thread_arg_t* rtargp) :
144 instance(instance), rtargp(rtargp) {};
145 };
146
147 class sender_thread_start_arg_t
148 {
149 public:
150 TPoverTCP* instance;
151 FastQueue* sender_thread_queue;
152
153 sender_thread_start_arg_t(TPoverTCP* instance, FastQueue* sq) :
154 instance(instance), sender_thread_queue(sq) {};
155 };
156
157private:
158 /// returns already existing connection or establishes a new one
159 AssocData* get_connection_to(const appladdress& addr);
160
161 /// receiver thread for a specific socket
162 void sender_thread(void *argp);
163
164 /// receiver thread for a specific socket
165 void receiver_thread(void *argp);
166
167 /// send a message to the network via TCP
168 void tcpsend(NetMsg* msg, appladdress* addr);
169
170 /// sender thread starter for a specific socket
171 static void* sender_thread_starter(void *argp);
172
173 /// receiver thread starter for a specific socket
174 static void* receiver_thread_starter(void *argp);
175
176 /// a static starter method to invoke the actual main listener
177 static void* master_listener_thread_starter(void *argp);
178
179 /// main listener thread procedure
180 void master_listener_thread();
181
182 // create and start new sender thread
183 void create_new_sender_thread(FastQueue* senderqueue);
184
185 // create and start new receiver thread
186 void create_new_receiver_thread(AssocData* peer_assoc);
187
188 /// terminates particular thread
189 void stop_receiver_thread(AssocData* peer_assoc);
190
191 /// cleans up thread management structures
192 void cleanup_receiver_thread(AssocData* peer_assoc);
193
194 /// terminates a sender thread
195 void terminate_sender_thread(const AssocData* assoc);
196
197 /// terminates all active receiver or sender threads
198 void terminate_all_threads();
199
200 /// ConnectionMap instance for keeping track of all existing connections
201 ConnectionMap connmap;
202
203 /// store per receiver thread arguments, e.g. for signaling termination
204 typedef boost::unordered_map<pthread_t, receiver_thread_arg_t*> recv_thread_argmap_t;
205 recv_thread_argmap_t recv_thread_argmap;
206
207 /// store sender thread related information
208 typedef boost::unordered_map<appladdress, FastQueue*> sender_thread_queuemap_t;
209 sender_thread_queuemap_t senderthread_queuemap;
210
211 /// parameters for main TPoverTCP thread
212 const TPoverTCPParam tpparam;
213
214 /// did we already abort at thread shutdown
215 bool already_aborted;
216 /// message queue
217 FastQueue* msgqueue;
218
219 bool debug_pdu;
220 bool v4_mode;
221}; // end class TPoverTCP
222
223/** A simple internal message for selfmessages
224 * please note that carried items may get deleted after use of this message
225 * the message destructor does not delete any item automatically
226 */
227class TPoverTCPMsg : public message
228{
229 public:
230 // message type start/stop thread, send data
231 enum msg_t { start,
232 stop,
233 send_data
234 };
235
236 private:
237 const AssocData* peer_assoc;
238 const TPoverTCPMsg::msg_t type;
239 NetMsg* netmsg;
240 appladdress* addr;
241
242public:
243 TPoverTCPMsg(const AssocData* peer_assoc, message::qaddr_t source= qaddr_unknown, TPoverTCPMsg::msg_t type= stop) :
244 message(type_transport, source), peer_assoc(peer_assoc), type(type), netmsg(0), addr(0) {}
245
246 TPoverTCPMsg(NetMsg* netmsg, appladdress* addr, message::qaddr_t source= qaddr_unknown) :
247 message(type_transport, source), peer_assoc(0), type(send_data), netmsg(netmsg), addr(addr) {}
248
249 virtual ~TPoverTCPMsg() {}
250
251 const AssocData* get_peer_assoc() const { return peer_assoc; }
252 TPoverTCPMsg::msg_t get_msgtype() const { return type; }
253 NetMsg* get_netmsg() const { return netmsg; }
254 appladdress* get_appladdr() const { return addr; }
255};
256
257} // end namespace protlib
258
259#endif
Note: See TracBrowser for help on using the repository browser.