00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00034 #ifndef TP_OVER_TCP_H
00035 #define TP_OVER_TCP_H
00036
00037 #include <ext/hash_map>
00038
00039 #include "tp.h"
00040 #include "threads.h"
00041 #include "threadsafe_db.h"
00042 #include "connectionmap.h"
00043 #include "assocdata.h"
00044
00045 namespace protlib
00046 {
00053 struct TPoverTCPParam : public ThreadParam
00054 {
00056 TPoverTCPParam(unsigned short common_header_length,
00057 bool (*const getmsglength) (NetMsg& m, uint32& clen_bytes),
00058 port_t p,
00059 const char* threadname= "TPoverTCP",
00060 uint32 sleep = ThreadParam::default_sleep_time,
00061 bool debug_pdu = false,
00062 message::qaddr_t source = message::qaddr_transport,
00063 message::qaddr_t dest = message::qaddr_signaling,
00064 bool sendaborts = false,
00065 uint8 tos = 0x10) :
00066 ThreadParam(sleep,threadname,1,1),
00067 port(p),
00068 debug_pdu(debug_pdu),
00069 source(source),
00070 dest(dest),
00071 common_header_length(common_header_length),
00072 getmsglength(getmsglength),
00073 terminate(false),
00074 ip_tos(tos)
00075 {};
00076
00077
00079 const port_t port;
00080 bool debug_pdu;
00082 const message::qaddr_t source;
00083 const message::qaddr_t dest;
00085 const unsigned short common_header_length;
00086
00089 bool (*const getmsglength) (NetMsg& m, uint32& clen_words);
00090
00092 const bool terminate;
00093 const uint8 ip_tos;
00094 };
00095
00096
00098
00099 class TPoverTCP : public TP, public Thread
00100 {
00101
00102 public:
00104 virtual void send(NetMsg* msg,const address& addr, bool use_existing_connection);
00105 virtual void terminate(const address& addr);
00106
00107
00108 public:
00110 virtual void main_loop(uint32 nr);
00111
00112
00113 public:
00115 TPoverTCP(const TPoverTCPParam& p) :
00116 TP(tsdb::get_tcp_id(),"tcp",p.name,p.common_header_length,p.getmsglength),
00117 Thread(p), tpparam(p), already_aborted(false), msgqueue(NULL), debug_pdu(p.debug_pdu)
00118 {
00119
00120
00121 init= true;
00122 }
00124 virtual ~TPoverTCP();
00125
00126 typedef
00127 struct receiver_thread_arg
00128 {
00129 const AssocData* peer_assoc;
00130 bool sig_terminate;
00131 bool terminated;
00132 public:
00133 receiver_thread_arg(const AssocData* peer_assoc) :
00134 peer_assoc(peer_assoc), sig_terminate(false), terminated(true) {};
00135 } receiver_thread_arg_t;
00136
00137 class receiver_thread_start_arg_t
00138 {
00139 public:
00140 TPoverTCP* instance;
00141 receiver_thread_arg_t* rtargp;
00142
00143 receiver_thread_start_arg_t(TPoverTCP* instance, receiver_thread_arg_t* rtargp) :
00144 instance(instance), rtargp(rtargp) {};
00145 };
00146
00147 class sender_thread_start_arg_t
00148 {
00149 public:
00150 TPoverTCP* instance;
00151 FastQueue* sender_thread_queue;
00152
00153 sender_thread_start_arg_t(TPoverTCP* instance, FastQueue* sq) :
00154 instance(instance), sender_thread_queue(sq) {};
00155 };
00156
00157 private:
00159 AssocData* get_connection_to(const appladdress& addr);
00160
00162 void sender_thread(void *argp);
00163
00165 void receiver_thread(void *argp);
00166
00168 void tcpsend(NetMsg* msg, appladdress* addr);
00169
00171 static void* sender_thread_starter(void *argp);
00172
00174 static void* receiver_thread_starter(void *argp);
00175
00177 static void* master_listener_thread_starter(void *argp);
00178
00180 void master_listener_thread();
00181
00182
00183 void create_new_sender_thread(FastQueue* senderqueue);
00184
00185
00186 void create_new_receiver_thread(AssocData* peer_assoc);
00187
00189 void stop_receiver_thread(AssocData* peer_assoc);
00190
00192 void cleanup_receiver_thread(AssocData* peer_assoc);
00193
00195 void terminate_sender_thread(const AssocData* assoc);
00196
00198 void terminate_all_threads();
00199
00201 ConnectionMap connmap;
00202
00204 typedef hash_map<pthread_t, receiver_thread_arg_t*> recv_thread_argmap_t;
00205 recv_thread_argmap_t recv_thread_argmap;
00206
00208 typedef hash_map<appladdress, FastQueue*> sender_thread_queuemap_t;
00209 sender_thread_queuemap_t senderthread_queuemap;
00210
00212 const TPoverTCPParam tpparam;
00213
00215 bool already_aborted;
00217 FastQueue* msgqueue;
00218
00219 bool debug_pdu;
00220 };
00221
00226 class TPoverTCPMsg : public message
00227 {
00228 public:
00229
00230 enum msg_t { start,
00231 stop,
00232 send_data
00233 };
00234
00235 private:
00236 const AssocData* peer_assoc;
00237 const TPoverTCPMsg::msg_t type;
00238 NetMsg* netmsg;
00239 appladdress* addr;
00240
00241 public:
00242 TPoverTCPMsg(const AssocData* peer_assoc, message::qaddr_t source= qaddr_unknown, TPoverTCPMsg::msg_t type= stop) :
00243 message(type_transport, source), peer_assoc(peer_assoc), type(type), netmsg(0), addr(0) {}
00244
00245 TPoverTCPMsg(NetMsg* netmsg, appladdress* addr, message::qaddr_t source= qaddr_unknown) :
00246 message(type_transport, source), peer_assoc(0), type(send_data), netmsg(netmsg), addr(addr) {}
00247
00248 const AssocData* get_peer_assoc() const { return peer_assoc; }
00249 TPoverTCPMsg::msg_t get_msgtype() const { return type; }
00250 NetMsg* get_netmsg() const { return netmsg; }
00251 appladdress* get_appladdr() const { return addr; }
00252 };
00253
00254 }
00255
00256 #endif