Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (11 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

Location:
source/ariba/utility/messages
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/utility/messages/Message.cpp

    r8620 r12060  
    4141#include "ariba/utility/serialization/DataStream.hpp"
    4242
     43#include "ariba/utility/logging/Logging.h"
     44
    4345NAMESPACE_BEGIN
    4446
     
    8082}
    8183
     84
     85reboost::message_t Message::wrap_up_for_sending()
     86{
     87    assert( ! wrapped_up );
     88    wrapped_up = true;
     89   
     90    //// Adapt to new message system ////
     91    Data data = data_serialize(this, DEFAULT_V);
     92    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     93
     94    newstyle_payload.push_front(buf);
     95   
     96    return newstyle_payload;
     97}
     98
     99reboost::shared_buffer_t Message::serialize_into_shared_buffer()
     100{
     101    assert ( newstyle_payload.length() == 0 );
     102   
     103    //// Adapt to new message system ////
     104    Data data = data_serialize(this, DEFAULT_V);
     105    reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
     106   
     107    return buf;
     108}
     109
     110
     111reboost::shared_buffer_t Message::deserialize_from_shared_buffer(reboost::shared_buffer_t buff)
     112{
     113    // NOTE: legacy payload is not allowed when using shared buffers
     114    this->legacy_payload_disabled = true;
     115   
     116    assert( buff.size() > 0 );
     117   
     118    // const_cast is necessary here, but without legacy payload we should be save here (more or less)
     119    Data dat(const_cast<uint8_t*>(buff.data()), buff.size() * 8);
     120   
     121    size_t len = this->SERIALIZATION_METHOD_NAME(DESERIALIZE, dat) / 8;
     122
     123    // return remaining sub-buffer
     124    return buff(len);
     125}
     126
     127
     128
    82129NAMESPACE_END
    83130
    84131std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg ) {
    85132        using_serialization;
    86         stream << "msg(type=" << typeid(msg).name();
     133        stream << "msg(type=" << typeid(msg).name() << ",";
    87134        stream << "len=" << (data_length(&msg)/8) << ",";
    88135        Data data = data_serialize(&msg);
    89         stream << ",data=" << data;
     136        stream << "data=" << data;
    90137        data.release();
    91138        stream << ")";
    92139        return stream;
    93140}
    94 
  • source/ariba/utility/messages/Message.h

    r6919 r12060  
    6262#include "ariba/utility/serialization.h"
    6363
     64// reboost messages
     65#include "ariba/utility/transport/messages/message.hpp"
     66
     67
    6468std::ostream& operator<<(std::ostream& stream, const ariba::utility::Message& msg );
    6569
     
    8286        friend std::ostream& ::operator<<(std::ostream& stream, const ariba::utility::Message& msg );
    8387
    84         // root binary data
    85         shared_array<uint8_t> root;
    86 
    8788        // payload
     89        bool legacy_payload_disabled;
    8890        bool releasePayload;
    8991        Data payload; //< messages binary data
     92       
     93        // XXX testing...
     94        reboost::message_t newstyle_payload;
     95        bool wrapped_up;
    9096
    9197        // addresses and control info
     
    98104         */
    99105        inline Message() :
    100                 root(), releasePayload(true), payload(), srcAddr(NULL),destAddr(NULL) {
     106            legacy_payload_disabled(false), releasePayload(true), payload(),
     107            newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) {
    101108        }
    102109
     
    106113         */
    107114        explicit inline Message( const Data& data ) :
    108                 releasePayload(true), srcAddr(NULL),destAddr(NULL) {
     115        legacy_payload_disabled(false), releasePayload(true),
     116        newstyle_payload(), wrapped_up(false), srcAddr(NULL),destAddr(NULL) {  // FIXME newstyle_payload..?
    109117                this->payload = data.clone();
    110118//              this->root = shared_array<uint8_t>((uint8_t*)data.getBuffer());
     
    225233                return decapsulate<T>();
    226234        }
     235       
     236       
     237        // XXX testing
     238        void set_payload_message(reboost::message_t msg)
     239        {
     240            newstyle_payload = msg;
     241        }
     242       
     243        void append_buffer(reboost::shared_buffer_t buff)
     244        {
     245            newstyle_payload.push_back(buff);
     246        }
     247       
     248       
     249        // XXX testing... packs this message into the payload message (do not use twice!!)
     250        virtual reboost::message_t wrap_up_for_sending();
     251       
     252       
     253        /**
     254         * Uses the old serialization system to serialize itself into a (new style) shared buffer.
     255         */
     256        virtual reboost::shared_buffer_t serialize_into_shared_buffer();
     257       
     258        /*
     259         * XXX experimental
     260         *
     261         * Uses the old serialization system to deserialize itself out of a (new style) shared buffer.
     262         * @return remaining sub-buffer (the "payload")
     263         *
     264         * Note: This is some kind of a hack! handle with care.
     265         */
     266        virtual reboost::shared_buffer_t deserialize_from_shared_buffer(reboost::shared_buffer_t buff);
     267       
    227268
    228269protected:
     
    262303         * @return A explicit payload serializer
    263304         */
    264         finline PayloadSerializer Payload( size_t length = ~0 ) {
     305        finline PayloadSerializer Payload( size_t length = ~0 )
     306        {
     307//          assert( ! legacy_payload_disabled );  // FIXME aktuell
     308           
    265309                return PayloadSerializer( this, length );
    266310        }
  • source/ariba/utility/messages/MessageProvider.cpp

    r6919 r12060  
    3737// [License]
    3838
    39 #include "MessageProvider.h"
    40 
    41 NAMESPACE_BEGIN
    42 
    43 MessageProvider::MessageProvider() {
    44 }
    45 
    46 MessageProvider::~MessageProvider() {
    47 }
    48 
    49 bool MessageProvider::sendMessageToReceivers( const Message* message ) {
    50         bool sent =  false;
    51         for (size_t i=0; i<receivers.size(); i++)
    52                 if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true;
    53         return sent;
    54 }
    55 
    56 void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) {
    57         receivers.push_back(receiver);
    58 }
    59 
    60 void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) {
    61         for (size_t i=0; i<receivers.size(); i++)
    62                 if (receivers[i]==receiver) {
    63                         receivers.erase( receivers.begin()+i );
    64                         break;
    65                 }
    66 }
    67 
    68 NAMESPACE_END
     39//#include "MessageProvider.h"
     40//
     41//NAMESPACE_BEGIN
     42//
     43//MessageProvider::MessageProvider() {
     44//}
     45//
     46//MessageProvider::~MessageProvider() {
     47//}
     48//
     49//bool MessageProvider::sendMessageToReceivers( const Message* message ) {
     50//      bool sent =  false;
     51//      for (size_t i=0; i<receivers.size(); i++)
     52//              if (receivers[i]->receiveMessage(message, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED)) sent = true;
     53//      return sent;
     54//}
     55//
     56//void MessageProvider::addMessageReceiver( MessageReceiver* receiver ) {
     57//      receivers.push_back(receiver);
     58//}
     59//
     60//void MessageProvider::removeMessageReceiver( MessageReceiver* receiver ) {
     61//      for (size_t i=0; i<receivers.size(); i++)
     62//              if (receivers[i]==receiver) {
     63//                      receivers.erase( receivers.begin()+i );
     64//                      break;
     65//              }
     66//}
     67//
     68//NAMESPACE_END
  • source/ariba/utility/messages/MessageProvider.h

    r3690 r12060  
     1// XXX DEPRECATED
     2
    13// [License]
    24// The Ariba-Underlay Copyright
     
    3739// [License]
    3840
     41// XXX DEPRECATED
     42
    3943#ifndef MESSAGEPROVIDER_H_
    4044#define MESSAGEPROVIDER_H_
    4145
    42 #include "_namespace.h"
    43 #include "MessageReceiver.h"
    44 #include "ariba/utility/types/LinkID.h"
    45 #include "ariba/utility/types/NodeID.h"
    46 #include <vector>
    47 
    48 using std::vector;
    49 using ariba::utility::LinkID;
    50 using ariba::utility::NodeID;
    51 
    52 NAMESPACE_BEGIN
    53 
    54 
    55 /**
    56  * This class defines an interface for message providers.
    57  * Implementing classes must allow receivers to register themselves.
    58  *
    59  * @author Sebastian Mies
    60  */
    61 class MessageProvider {
    62 private:
    63         vector<MessageReceiver*> receivers;
    64 
    65 protected:
    66         bool sendMessageToReceivers( const Message* message );
    67 
    68 public:
    69         /**
    70          * Constructor.
    71          */
    72         MessageProvider();
    73 
    74         /**
    75          * Destructor.
    76          */
    77         ~MessageProvider();
    78 
    79         /**
    80          * Adds a message receiver.
    81          *
    82          * @param receiver The receiver.
    83          */
    84         void addMessageReceiver( MessageReceiver* receiver );
    85 
    86         /**
    87          * Removes a message receiver.
    88          *
    89          * @param receiver The receiver.
    90          */
    91         void removeMessageReceiver( MessageReceiver* receiver );
    92 };
    93 
    94 NAMESPACE_END
     46//#include "_namespace.h"
     47//#include "MessageReceiver.h"
     48//#include "ariba/utility/types/LinkID.h"
     49//#include "ariba/utility/types/NodeID.h"
     50//#include <vector>
     51//
     52//using std::vector;
     53//using ariba::utility::LinkID;
     54//using ariba::utility::NodeID;
     55//
     56//NAMESPACE_BEGIN
     57//
     58//
     59///**
     60// * This class defines an interface for message providers.
     61// * Implementing classes must allow receivers to register themselves.
     62// *
     63// * @author Sebastian Mies
     64// */
     65//class MessageProvider {
     66//private:
     67//      vector<MessageReceiver*> receivers;
     68//
     69//protected:
     70//      bool sendMessageToReceivers( const Message* message );
     71//
     72//public:
     73//      /**
     74//       * Constructor.
     75//       */
     76//      MessageProvider();
     77//
     78//      /**
     79//       * Destructor.
     80//       */
     81//      ~MessageProvider();
     82//
     83//      /**
     84//       * Adds a message receiver.
     85//       *
     86//       * @param receiver The receiver.
     87//       */
     88//      void addMessageReceiver( MessageReceiver* receiver );
     89//
     90//      /**
     91//       * Removes a message receiver.
     92//       *
     93//       * @param receiver The receiver.
     94//       */
     95//      void removeMessageReceiver( MessageReceiver* receiver );
     96//};
     97//
     98//NAMESPACE_END
    9599
    96100#endif /* MESSAGEPROVIDER_H_ */
  • source/ariba/utility/messages/MessageReceiver.cpp

    r3690 r12060  
    4949}
    5050
    51 bool MessageReceiver::receiveMessage( const Message* message, const LinkID& link, const NodeID& node ) {
    52         //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl;
    53         return false;
    54 }
     51//bool MessageReceiver::receiveMessage( reboost::shared_buffer_t message, const LinkID& link, const NodeID& node ) {
     52//      //std::cout << "UNIMPLEMENTED MessageReceiver got Message:" << (Message*)message << std::endl;
     53//      return false;
     54//}
    5555
    5656NAMESPACE_END
  • source/ariba/utility/messages/MessageReceiver.h

    r3690 r12060  
    4040#define MESSAGERECEIVER_H__
    4141
    42 #include "ariba/utility/messages/Message.h"
     42//#include "ariba/utility/messages/Message.h"
     43// reboost messages
     44#include "ariba/utility/transport/messages/message.hpp"
    4345#include "ariba/utility/types/LinkID.h"
    4446#include "ariba/utility/types/NodeID.h"
     
    7375         * @return True, when the message has been accepted.
    7476         */
    75         virtual bool receiveMessage( const Message* message, const LinkID& link, const NodeID& node );
     77        virtual bool receiveMessage( reboost::shared_buffer_t message,
     78                const LinkID& link,
     79                const NodeID& node,
     80                bool bypass_overlay ) = 0;
    7681};
    7782
Note: See TracChangeset for help on using the changeset viewer.