Changeset 10653 for source/ariba/communication
- Timestamp:
- Jul 25, 2012, 11:41:36 AM (13 years ago)
- Location:
- source/ariba/communication
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
source/ariba/communication/BaseCommunication.cpp
r9322 r10653 41 41 #include "networkinfo/AddressDiscovery.h" 42 42 #include "ariba/utility/types/PeerID.h" 43 #include <boost/function.hpp> 43 44 44 45 #ifdef UNDERLAY_OMNET … … 283 284 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent ); 284 285 285 class DispatchMsg {286 public:287 DispatchMsg() : local(NULL), remote(NULL), message(NULL) {}288 address_v* local;289 address_v* remote;290 Message* message;291 };292 293 286 /// called when a system event is emitted by system queue 294 287 void BaseCommunication::handleSystemEvent(const SystemEvent& event) { … … 297 290 if ( event.getType() == MessageDispatchEvent ){ 298 291 logging_debug( "Forwarding message receiver" ); 299 DispatchMsg* dmsg = event.getData<DispatchMsg>(); 300 Message* msg = dmsg->message; 301 receiveMessage(msg, dmsg->local, dmsg->remote); 302 msg->dropPayload(); 303 delete dmsg->local; 304 delete dmsg->remote; 305 delete msg; 306 delete dmsg; 307 } 308 } 309 310 /// called when a message is received from transport_peer 311 void BaseCommunication::receive_message(transport_protocol* transport, 312 const address_vf local, const address_vf remote, const uint8_t* data, 313 size_t size) { 314 315 // logging_debug( "Dispatching message" ); 316 317 // convert data 318 Data data_( const_cast<uint8_t*>(data), size * 8 ); 319 DispatchMsg* dmsg = new DispatchMsg(); 320 321 Message* msg = new Message(data_); 322 dmsg->local = local->clone(); 323 dmsg->remote = remote->clone(); 324 dmsg->message = msg; 325 326 SystemQueue::instance().scheduleEvent( 327 SystemEvent( this, MessageDispatchEvent, dmsg ) 328 ); 329 } 330 331 /// handles a message from the underlay transport 332 void BaseCommunication::receiveMessage(const Message* message, 333 const address_v* local, const address_v* remote ){ 334 292 boost::function0<void>* handler = event.getData< boost::function0<void> >(); 293 (*handler)(); 294 delete handler; 295 } 296 } 297 298 /** 299 * called within the ASIO thread 300 * when a message is received from underlay transport 301 */ 302 void BaseCommunication::receive_message(transport_connection::sptr connection, 303 reboost::message_t msg) { 304 305 logging_debug( "Dispatching message" ); 306 307 boost::function0<void>* handler = new boost::function0<void>( 308 boost::bind( 309 &BaseCommunication::receiveMessage, 310 this, 311 connection, 312 msg) 313 ); 314 315 SystemQueue::instance().scheduleEvent( 316 SystemEvent(this, MessageDispatchEvent, handler) 317 ); 318 } 319 320 /** 321 * called within the ARIBA thread (System Queue) 322 * when a message is received from underlay transport 323 */ 324 void BaseCommunication::receiveMessage(transport_connection::sptr connection, 325 reboost::message_t message) 326 { 327 328 //// Adapt to old message system //// 329 // Copy data 330 size_t bytes_len = message.size(); 331 uint8_t* bytes = new uint8_t[bytes_len]; 332 message.read(bytes, 0, bytes_len); 333 334 Data data(bytes, bytes_len * 8); 335 336 Message legacy_message; 337 legacy_message.setPayload(data); 338 339 340 335 341 /// decapsulate message 336 AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>();342 AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>(); 337 343 logging_debug( "Receiving message of type " << msg->getTypeString() ); 338 344 … … 379 385 LinkID localLink = LinkID::create(); 380 386 LinkID remoteLink = msg->getLocalLink(); 381 logging_debug( "local=" << local->to_string() 382 << " remote=" << remote->to_string() 387 logging_debug( 388 "local=" << connection->getLocalEndpoint()->to_string() 389 << " remote=" << connection->getRemoteEndpoint()->to_string() 383 390 ); 384 391 … … 386 393 bool allowlink = true; 387 394 BOOST_FOREACH( CommunicationEvents* i, eventListener ){ 388 allowlink &= i->onLinkRequest( localLink, local, remote ); 395 allowlink &= i->onLinkRequest( localLink, 396 connection->getLocalEndpoint(), 397 connection->getRemoteEndpoint()); 389 398 } 390 399 … … 400 409 ld->localLink = localLink; 401 410 ld->remoteLink = remoteLink; 402 ld->localLocator = local->clone(); 403 ld->remoteLocator = remote->clone(); 411 ld->localLocator = connection->getLocalEndpoint()->clone(); 412 ld->remoteLocator = connection->getRemoteEndpoint()->clone(); 413 ld->connection = connection; 404 414 ld->remoteEndpoint = msg->getLocalDescriptor(); 405 415 add_endpoint(ld->remoteLocator); … … 409 419 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 410 420 localDescriptor.getEndpoints().add( 411 local, endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 421 connection->getLocalEndpoint(), 422 endpoint_set::Layer1_3 | endpoint_set::NoLoopback); 412 423 413 424 // link is now up-> add it … … 459 470 } 460 471 472 // store the connection 473 ld.connection = connection; 474 461 475 // set remote locator and link id 462 476 ld.remoteLink = msg->getLocalLink(); 463 ld.remoteLocator = remote->clone();477 ld.remoteLocator = connection->getRemoteEndpoint()->clone(); 464 478 ld.remoteEndpoint.getEndpoints().add( 465 479 msg->getLocalDescriptor().getEndpoints(), … … 536 550 // update the remote locator 537 551 const address_v* oldremote = linkDesc.remoteLocator; 538 linkDesc.remoteLocator = remote->clone();552 linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone(); 539 553 540 554 // inform the listeners (local link has _not_ changed!) … … 747 761 748 762 /// sends a message to all end-points in the end-point descriptor 749 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) { 750 Data data = data_serialize( message, DEFAULT_V ); 751 transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8); 752 data.release(); 763 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) { 764 Data data = data_serialize(legacy_message, DEFAULT_V); 765 766 //// Adapt to new message system //// 767 // transfer data buffer ownership to the shared_buffer 768 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 769 770 reboost::message_t message; 771 message.push_back(buf); 772 773 transport->send(endpoint.getEndpoints(), message); 753 774 } 754 775 755 776 /// sends a message to the remote locator inside the link descriptor 756 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) {777 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) { 757 778 if (desc.remoteLocator==NULL) return; 758 Data data = data_serialize( message, DEFAULT_V ); 759 transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8); 760 data.release(); 779 780 Data data = data_serialize(legacy_message, DEFAULT_V); 781 782 //// Adapt to new message system //// 783 // transfer data buffer ownership to the shared_buffer 784 reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8); 785 786 reboost::message_t message; 787 message.push_back(buf); 788 789 desc.connection->send(message); 761 790 } 762 791 -
source/ariba/communication/BaseCommunication.h
r9694 r10653 60 60 #include "ariba/utility/addressing/addressing.hpp" 61 61 #include "ariba/utility/transport/transport.hpp" 62 #include "ariba/utility/transport/transport_connection.hpp" 62 63 63 64 // communication … … 190 191 virtual void handleSystemEvent(const SystemEvent& event); 191 192 192 /// called when a message is received form transport_peer 193 virtual void receive_message(transport_protocol* transport, 194 const address_vf local, const address_vf remote, const uint8_t* data, 195 size_t size); 193 /** 194 * called within the ASIO thread 195 * when a message is received from underlay transport 196 */ 197 virtual void receive_message(transport_connection::sptr connection, 198 reboost::message_t msg); 196 199 197 200 protected: 198 201 199 /// handle received message from a transport module 200 void receiveMessage(const Message* message, 201 const address_v* local, const address_v* remote ); 202 /** 203 * called within the ARIBA thread (System Queue) 204 * when a message is received from underlay transport 205 */ 206 void receiveMessage(transport_connection::sptr connection, 207 reboost::message_t msg); 202 208 203 209 /// called when a network interface change happens … … 250 256 /// flag, whether this link is up 251 257 bool up; 258 259 /// connection if link is up 260 transport_connection::sptr connection; 252 261 }; 253 262 -
source/ariba/communication/networkinfo/AddressDiscovery.cpp
r8620 r10653 129 129 ip_address ip = straddr; 130 130 if (ip.is_loopback()) continue; 131 if (ip.is_link_local()) continue;131 // if (ip.is_link_local()) continue; 132 132 address_vf vf = ip; 133 133 endpoints.add( vf ); -
source/ariba/communication/networkinfo/NetworkInformation.h
r3690 r10653 40 40 #define __NETWORK_INFORMATION_H 41 41 42 #include <unistd.h> 42 43 #include <vector> 43 44 #include <string>
Note:
See TracChangeset
for help on using the changeset viewer.