Ignore:
Timestamp:
Jul 25, 2012, 11:41:36 AM (12 years ago)
Author:
Michael Tänzer
Message:

Merge the ASIO branch back into trunk

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/communication/BaseCommunication.cpp

    r9322 r10653  
    4141#include "networkinfo/AddressDiscovery.h"
    4242#include "ariba/utility/types/PeerID.h"
     43#include <boost/function.hpp>
    4344
    4445#ifdef UNDERLAY_OMNET
     
    283284SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
    284285
    285 class DispatchMsg {
    286 public:
    287         DispatchMsg() : local(NULL), remote(NULL), message(NULL) {}
    288         address_v* local;
    289         address_v* remote;
    290         Message* message;
    291 };
    292 
    293286/// called when a system event is emitted by system queue
    294287void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
     
    297290        if ( event.getType() == MessageDispatchEvent ){
    298291                logging_debug( "Forwarding message receiver" );
    299                 DispatchMsg* dmsg = event.getData<DispatchMsg>();
    300                 Message* msg = dmsg->message;
    301                 receiveMessage(msg, dmsg->local, dmsg->remote);
    302                 msg->dropPayload();
    303                 delete dmsg->local;
    304                 delete dmsg->remote;
    305                 delete msg;
    306                 delete dmsg;
    307         }
    308 }
    309 
    310 /// called when a message is received from transport_peer
    311 void BaseCommunication::receive_message(transport_protocol* transport,
    312         const address_vf local, const address_vf remote, const uint8_t* data,
    313         size_t size) {
    314 
    315 //      logging_debug( "Dispatching message" );
    316 
    317         // convert data
    318         Data data_( const_cast<uint8_t*>(data), size * 8 );
    319         DispatchMsg* dmsg = new DispatchMsg();
    320 
    321         Message* msg = new Message(data_);
    322         dmsg->local = local->clone();
    323         dmsg->remote = remote->clone();
    324         dmsg->message = msg;
    325 
    326         SystemQueue::instance().scheduleEvent(
    327                 SystemEvent( this, MessageDispatchEvent, dmsg )
    328         );
    329 }
    330 
    331 /// handles a message from the underlay transport
    332 void BaseCommunication::receiveMessage(const Message* message,
    333         const address_v* local, const address_v* remote ){
    334 
     292                boost::function0<void>* handler = event.getData< boost::function0<void> >();
     293                (*handler)();
     294                delete handler;
     295        }
     296}
     297
     298/**
     299 * called within the ASIO thread
     300 * when a message is received from underlay transport
     301 */
     302void BaseCommunication::receive_message(transport_connection::sptr connection,
     303        reboost::message_t msg) {
     304
     305        logging_debug( "Dispatching message" );
     306       
     307    boost::function0<void>* handler = new boost::function0<void>(
     308            boost::bind(
     309                    &BaseCommunication::receiveMessage,
     310                    this,
     311                    connection,
     312                    msg)
     313    );
     314   
     315    SystemQueue::instance().scheduleEvent(
     316        SystemEvent(this, MessageDispatchEvent, handler)
     317    );
     318}
     319
     320/**
     321 * called within the ARIBA thread (System Queue)
     322 * when a message is received from underlay transport
     323 */
     324void BaseCommunication::receiveMessage(transport_connection::sptr connection,
     325        reboost::message_t message)
     326{
     327   
     328    //// Adapt to old message system ////
     329    // Copy data
     330    size_t bytes_len = message.size();
     331    uint8_t* bytes = new uint8_t[bytes_len];
     332    message.read(bytes, 0, bytes_len);
     333   
     334    Data data(bytes, bytes_len * 8);
     335   
     336    Message legacy_message;
     337    legacy_message.setPayload(data);
     338   
     339   
     340   
    335341        /// decapsulate message
    336         AribaBaseMsg* msg = ((Message*)message)->decapsulate<AribaBaseMsg>();
     342        AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>();
    337343        logging_debug( "Receiving message of type " << msg->getTypeString() );
    338344
     
    379385                        LinkID localLink  = LinkID::create();
    380386                        LinkID remoteLink = msg->getLocalLink();
    381                         logging_debug( "local=" << local->to_string()
    382                                 << " remote=" << remote->to_string()
     387                        logging_debug(
     388                                "local=" << connection->getLocalEndpoint()->to_string()
     389                                << " remote=" << connection->getRemoteEndpoint()->to_string()
    383390                        );
    384391
     
    386393                        bool allowlink = true;
    387394                        BOOST_FOREACH( CommunicationEvents* i, eventListener ){
    388                                 allowlink &= i->onLinkRequest( localLink, local, remote );
     395                                allowlink &= i->onLinkRequest( localLink,
     396                                        connection->getLocalEndpoint(),
     397                                        connection->getRemoteEndpoint());
    389398                        }
    390399
     
    400409                        ld->localLink = localLink;
    401410                        ld->remoteLink = remoteLink;
    402                         ld->localLocator = local->clone();
    403                         ld->remoteLocator = remote->clone();
     411                        ld->localLocator = connection->getLocalEndpoint()->clone();
     412                        ld->remoteLocator = connection->getRemoteEndpoint()->clone();
     413                        ld->connection = connection;
    404414                        ld->remoteEndpoint = msg->getLocalDescriptor();
    405415                        add_endpoint(ld->remoteLocator);
     
    409419                                ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
    410420                        localDescriptor.getEndpoints().add(
    411                                 local, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
     421                                connection->getLocalEndpoint(),
     422                                endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
    412423
    413424                        // link is now up-> add it
     
    459470                        }
    460471
     472                        // store the connection
     473                        ld.connection = connection;
     474                       
    461475                        // set remote locator and link id
    462476                        ld.remoteLink = msg->getLocalLink();
    463                         ld.remoteLocator = remote->clone();
     477                        ld.remoteLocator = connection->getRemoteEndpoint()->clone();
    464478                        ld.remoteEndpoint.getEndpoints().add(
    465479                                                        msg->getLocalDescriptor().getEndpoints(),
     
    536550                        // update the remote locator
    537551                        const address_v* oldremote = linkDesc.remoteLocator;
    538                         linkDesc.remoteLocator = remote->clone();
     552                        linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone();
    539553
    540554                        // inform the listeners (local link has _not_ changed!)
     
    747761
    748762/// sends a message to all end-points in the end-point descriptor
    749 void BaseCommunication::send(Message* message, const EndpointDescriptor& endpoint) {
    750         Data data = data_serialize( message, DEFAULT_V );
    751         transport->send( endpoint.getEndpoints(), data.getBuffer(), data.getLength() / 8);
    752         data.release();
     763void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) {
     764        Data data = data_serialize(legacy_message, DEFAULT_V);
     765       
     766        //// Adapt to new message system ////
     767        // transfer data buffer ownership to the shared_buffer
     768    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     769       
     770        reboost::message_t message;
     771        message.push_back(buf);
     772       
     773        transport->send(endpoint.getEndpoints(), message);
    753774}
    754775
    755776/// sends a message to the remote locator inside the link descriptor
    756 void BaseCommunication::send(Message* message, const LinkDescriptor& desc) {
     777void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) {
    757778        if (desc.remoteLocator==NULL) return;
    758         Data data = data_serialize( message, DEFAULT_V );
    759         transport->send( desc.remoteLocator, data.getBuffer(), data.getLength() / 8);
    760         data.release();
     779       
     780        Data data = data_serialize(legacy_message, DEFAULT_V);
     781   
     782    //// Adapt to new message system ////
     783    // transfer data buffer ownership to the shared_buffer
     784    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     785   
     786    reboost::message_t message;
     787    message.push_back(buf);
     788   
     789        desc.connection->send(message);
    761790}
    762791
Note: See TracChangeset for help on using the changeset viewer.