Ignore:
Timestamp:
Jun 19, 2013, 11:05:49 AM (12 years ago)
Author:
hock@…
Message:

Reintegrate branch: 20130111-hock-message_classes

improvements:

  • new message classes (reboost, zero-copy)
  • "fast path" for direct links (skip overlay layer)
  • link-properties accessible from the application
  • SystemQueue can call boost::bind functions
  • protlib compatibility removed (32bit overhead saved in every message)
  • addressing2
  • AddressDiscovery discoveres only addresses on which we're actually listening
  • ariba serialization usage reduced (sill used in OverlayMsg)
  • Node::connect, easier and cleaner interface to start-up ariba from the application
  • ariba configs via JSON, XML, etc (boost::property_tree)
  • keep-alive overhead greatly reduced
  • (relayed) overlay links can actually be closed now
  • lost messages are detected in most cases
  • notification to the application when link is transformed into direct-link
  • overlay routing: send message to second best hop if it would be dropped otherwise
  • SequenceNumbers (only mechanisms, so for: upward compatibility)
  • various small fixes


regressions:

  • bluetooth is not yet working again
  • bootstrap modules deactivated
  • liblog4xx is not working (use cout-logging)

This patch brings great performance and stability improvements at cost of backward compatibility.
Also bluetooth and the bootstrap modules have not been ported to the new interfaces, yet.

Location:
source/ariba/communication
Files:
10 edited

Legend:

Unmodified
Added
Removed
  • source/ariba/communication/BaseCommunication.cpp

    r10767 r12060  
    5757namespace communication {
    5858
     59using namespace ariba::addressing2;
     60
    5961using ariba::utility::PeerID;
    6062using ariba::utility::SystemQueue;
    6163
    6264use_logging_cpp(BaseCommunication);
    63 
    64 /// adds an endpoint to the list
    65 void BaseCommunication::add_endpoint( const address_v* endpoint ) {
    66         if (endpoint==NULL) return;
    67         BOOST_FOREACH( endpoint_reference& ref, remote_endpoints ) {
    68                 if (ref.endpoint->type_id() == endpoint->type_id() && *ref.endpoint == *endpoint) {
    69                         ref.count++;
    70                         return;
    71                 }
    72         }
    73         endpoint_reference ref;
    74         ref.endpoint = endpoint->clone();
    75         ref.count = 1;
    76         remote_endpoints.push_back(ref);
    77 }
    78 
    79 /// removes an endpoint from the list
    80 void BaseCommunication::remove_endpoint( const address_v* endpoint ) {
    81         if (endpoint==NULL) return;
    82         for (vector<endpoint_reference>::iterator i = remote_endpoints.begin();
    83                 i != remote_endpoints.end(); i++) {
    84                 if ((*i->endpoint).type_id() == endpoint->type_id() && (*i->endpoint) == *endpoint) {
    85                         i->count--;
    86                         if (i->count==0) {
    87                                 logging_info("No more links to " << i->endpoint->to_string() << ": terminating transports!");
    88                                 transport->terminate(i->endpoint);
    89                                 delete i->endpoint;
    90                                 remote_endpoints.erase(i);
    91                         }
    92                         return;
    93                 }
    94         }
    95 }
    9665
    9766
     
    10069          transport( NULL ),
    10170          messageReceiver( NULL ),
    102           started( false )
     71          started( false ),
     72          listenOn_endpoints(new addressing2::endpoint_set())
    10373{
    10474}
     
    10979
    11080
    111 void BaseCommunication::start() {
     81void BaseCommunication::start(EndpointSetPtr listen_on) {
     82    assert ( ! started );
     83   
     84    listenOn_endpoints = listen_on;
     85    logging_info("Setting local end-points: " << listenOn_endpoints->to_string());
     86   
    11287        logging_info( "Starting up ..." );
    11388        currentSeqnum = 0;
    11489
    115         // set local peer id
    116         localDescriptor.getPeerId() = PeerID::random();
    117         logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
    118 
    11990        // creating transports
     91        //  ---> transport_peer holds the set of the active endpoints we're listening on
    12092        logging_info( "Creating transports ..." );
    121 
    122 #ifdef UNDERLAY_OMNET
    123         AribaOmnetModule* module = StartupWrapper::getCurrentModule();
    124         module->setServerPort( listenport );
    125 
    126         transport = module;
    127         network = new OmnetNetworkProtocol( module );
    128 #else
    129         transport = new transport_peer( localDescriptor.getEndpoints() );
    130 #endif
     93        transport = new transport_peer();
     94        active_listenOn_endpoints = transport->add_listenOn_endpoints(listenOn_endpoints);
     95        logging_info( "XXX. Active endpoints = " << active_listenOn_endpoints->to_string() ); // XXX
    13196
    13297        logging_info( "Searching for local locators ..." );
    133         /**
    134          * DONT DO THAT: if(localDescriptor.getEndpoints().to_string().length() == 0)
    135          * since addresses are used to initialize transport addresses
    136          */
    137         AddressDiscovery::discover_endpoints( localDescriptor.getEndpoints() );
    138         logging_info( "Done. Local endpoints = " << localDescriptor.toString() );
    139 
     98        local_endpoints = AddressDiscovery::discover_endpoints(active_listenOn_endpoints);
     99        if ( local_endpoints->count() > 0 )
     100        {
     101            logging_info( "Done. Discovered local endpoints: " << local_endpoints->to_string() );
     102        }
     103        else
     104        {
     105            logging_warn("WARING!! No local endpoints found, NO COMMUNICATION POSSIBLE!!");
     106           
     107            // TODO notify application, so that it may react properly. throw exception..?
     108            assert( false );
     109        }
     110
     111       
     112    // create local EndpointDescriptor
     113        //  ---> localDescriptor hold the set endpoints that can be used to reach us
     114    localDescriptor.getPeerId() = PeerID::random();
     115    localDescriptor.replace_endpoint_set(local_endpoints);
     116    logging_info( "Using PeerID: " << localDescriptor.getPeerId() );
     117
     118    // start transport_peer
    140119        transport->register_listener( this );
    141120        transport->start();
    142121
    143 #ifndef UNDERLAY_OMNET
    144122        // bind to the network change detection
    145123        networkMonitor.registerNotification( this );
    146 #endif
    147124
    148125        // base comm startup done
     
    163140bool BaseCommunication::isStarted(){
    164141        return started;
    165 }
    166 
    167 /// Sets the endpoints
    168 void BaseCommunication::setEndpoints( string& _endpoints ) {
    169         localDescriptor.getEndpoints().assign(_endpoints);
    170         logging_info("Setting local end-points: "
    171                 << localDescriptor.getEndpoints().to_string());
    172142}
    173143
     
    193163        addLink( ld );
    194164
    195         // send a message to request new link to remote
    196         logging_debug( "Send messages with request to open link to " << descriptor.toString() );
    197         AribaBaseMsg baseMsg( AribaBaseMsg::typeLinkRequest, linkid );
    198         baseMsg.getLocalDescriptor() = localDescriptor;
    199         baseMsg.getRemoteDescriptor().getPeerId() = descriptor.getPeerId();
    200 
    201         // serialize and send message
    202         send( &baseMsg, descriptor );
     165
     166    /* send a message to request new link to remote */
     167    logging_debug( "Send messages with request to open link to " << descriptor.toString() );
     168
     169    /*
     170         * Create Link-Request Message:
     171         * NOTE: - Their PeerID (in parent message)
     172         * - Our LinkID
     173         * - Our PeerID
     174         * - Our EndpointDescriptor
     175         */
     176        reboost::message_t linkmsg;
     177        linkmsg.push_back(linkid.serialize());
     178        linkmsg.push_back(localDescriptor.getPeerId().serialize());
     179        linkmsg.push_back(localDescriptor.endpoints->serialize());
     180       
     181//      // XXX AKTUELL BUG FINDING...
     182//      reboost::shared_buffer_t xxx = localDescriptor.endpoints->serialize();
     183//      EndpointSetPtr xxx_set = endpoint_set::create_EndpointSet();
     184//      xxx_set->deserialize(xxx);
     185//      cout << "/// MARIO VORHER: " << localDescriptor.endpoints->to_string() << endl;
     186//      cout << "/// MARIO NACHHER: " << xxx_set->to_string() << endl;
     187       
     188        // send message
     189        // TODO move enum to BaseComm
     190        send_to_peer(AribaBaseMsg::typeLinkRequest, descriptor.getPeerId(), linkmsg,
     191                descriptor, system_priority::OVERLAY);
    203192
    204193        return linkid;
     
    217206
    218207        // tell the registered listeners
    219         BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
     208        foreach( CommunicationEvents* i, eventListener ) {
    220209                i->onLinkDown( link, ld.localLocator, ld.remoteLocator );
    221210        }
    222211
    223         // create message to drop the link
     212
     213        // * send message to drop the link *
    224214        logging_debug( "Sending out link close request. for us, the link is closed now" );
    225         AribaBaseMsg msg( AribaBaseMsg::typeLinkClose, ld.localLink, ld.remoteLink );
    226 
    227         // send message to drop the link
    228         send( &msg, ld );
     215        reboost::message_t empty_message;
     216        send_over_link( AribaBaseMsg::typeLinkClose, empty_message, ld, system_priority::OVERLAY );
    229217
    230218        // remove from map
     
    232220}
    233221
    234 seqnum_t BaseCommunication::sendMessage( const LinkID lid, const Message* message) {
    235 
    236         logging_debug( "Sending out message to link " << lid.toString() );
    237 
     222
     223seqnum_t BaseCommunication::sendMessage( const LinkID& lid,
     224        reboost::message_t message,
     225        uint8_t priority,
     226        bool bypass_overlay) throw(communication_message_not_sent)
     227{
     228    // message type: direct data or (normal) data
     229    AribaBaseMsg::type_ type;
     230    if ( bypass_overlay )
     231    {
     232        type = AribaBaseMsg::typeDirectData;
     233        logging_debug( "Sending out direct-message to link " << lid.toString() );
     234    }
     235    else
     236    {
     237        type = AribaBaseMsg::typeData;
     238        logging_debug( "Sending out message to link " << lid.toString() );
     239    }
     240
     241   
    238242        // query local link info
    239243        LinkDescriptor& ld = queryLocalLink(lid);
    240         if( ld.isUnspecified() ){
    241                 logging_error( "Don't know the link with id " << lid.toString() );
    242                 return -1;
     244        if( ld.isUnspecified() )
     245        {
     246            throw communication_message_not_sent("Don't know the link with id "
     247                    + lid.toString());
    243248        }
    244249
    245250        // link not up-> error
    246         if( !ld.up ) {
    247                 logging_error("Can not send on link " << lid.toString() << ": link not up");
    248                 return -1;
    249         }
    250 
    251         // create message
    252         AribaBaseMsg msg( AribaBaseMsg::typeData, ld.localLink, ld.remoteLink );
    253 
    254         // encapsulate the payload message
    255         msg.encapsulate( const_cast<Message*>(message) );
    256 
    257         // send message
    258         send( &msg, ld );
    259 
    260         // return sequence number
     251        if( !ld.up )
     252        {
     253            throw communication_message_not_sent("Can not send on link "
     254                    + lid.toString() + ": link not up");
     255        }
     256
     257
     258        // * send message *
     259        bool okay = send_over_link( type, message, ld, priority );
     260
     261        if ( ! okay )
     262        {
     263            throw communication_message_not_sent("send_over_link failed!");
     264        }
     265       
    261266        return ++currentSeqnum;
    262267}
     
    268273                LinkDescriptor& linkDesc = queryLocalLink(link);
    269274                if (linkDesc.isUnspecified()) return EndpointDescriptor::UNSPECIFIED();
    270                 return linkDesc.remoteEndpoint;
     275                return linkDesc.remoteDescriptor;
    271276        }
    272277}
     
    283288}
    284289
    285 SystemEventType TransportEvent("Transport");
    286 SystemEventType MessageDispatchEvent("MessageDispatchEvent", TransportEvent );
    287 
    288 /// called when a system event is emitted by system queue
    289 void BaseCommunication::handleSystemEvent(const SystemEvent& event) {
    290 
    291         // dispatch received messages
    292         if ( event.getType() == MessageDispatchEvent ){
    293                 logging_debug( "Forwarding message receiver" );
    294                 boost::function0<void>* handler = event.getData< boost::function0<void> >();
    295                 (*handler)();
    296                 delete handler;
    297         }
    298 }
    299 
    300 /**
    301  * called within the ASIO thread
    302  * when a message is received from underlay transport
    303  */
     290
     291
     292/*------------------------------
     293 | ASIO thread --> SystemQueue |
     294 ------------------------------*/
     295
     296/// ASIO thread
    304297void BaseCommunication::receive_message(transport_connection::sptr connection,
    305         reboost::message_t msg) {
     298        reboost::shared_buffer_t msg) {
    306299
    307300        logging_debug( "Dispatching message" );
    308301       
    309     boost::function0<void>* handler = new boost::function0<void>(
     302    SystemQueue::instance().scheduleCall(
    310303            boost::bind(
    311304                    &BaseCommunication::receiveMessage,
     
    313306                    connection,
    314307                    msg)
    315     );
    316    
    317     SystemQueue::instance().scheduleEvent(
    318         SystemEvent(this, MessageDispatchEvent, handler)
    319     );
    320 }
    321 
    322 /**
    323  * called within the ARIBA thread (System Queue)
    324  * when a message is received from underlay transport
    325  */
     308        );
     309}
     310
     311/// ASIO thread
     312void BaseCommunication::connection_terminated(transport_connection::sptr connection)
     313{
     314    SystemQueue::instance().scheduleCall(
     315            boost::bind(
     316                    &BaseCommunication::connectionTerminated,
     317                    this,
     318                    connection)
     319        );
     320}
     321
     322/*--------------------------------
     323 | [ASIO thread --> SystemQueue] |
     324 -------------------------------*/
     325
     326/// ARIBA thread (System Queue)
     327void BaseCommunication::connectionTerminated(transport_connection::sptr connection)
     328{
     329    vector<LinkID*> links = connection->get_communication_links();
     330   
     331    logging_debug("[BaseCommunication] Connection terminated: "
     332            << connection->getLocalEndpoint()->to_string()
     333            << " <--> " << connection->getRemoteEndpoint()->to_string()
     334            << " (" << links.size() << " links)");
     335   
     336    // remove all links that used the terminated connection
     337    for ( vector<LinkID*>::iterator it = links.begin(); it != links.end(); ++it )
     338    {
     339        LinkID& link_id = **it;
     340       
     341        logging_debug("  ---> Removing link: " << link_id.toString());
     342       
     343        // searching for link, not found-> warn
     344        LinkDescriptor& linkDesc = queryLocalLink( link_id );
     345        if (linkDesc.isUnspecified()) {
     346            logging_warn("Failed to find local link " << link_id.toString());
     347            continue;
     348        }
     349
     350        // inform listeners
     351        foreach( CommunicationEvents* i, eventListener ){
     352            i->onLinkFail( linkDesc.localLink,
     353                    linkDesc.localLocator, linkDesc.remoteLocator );
     354        }
     355
     356        // remove the link descriptor
     357        removeLink( link_id );
     358    }
     359}
     360
     361/// ARIBA thread (System Queue)
    326362void BaseCommunication::receiveMessage(transport_connection::sptr connection,
    327         reboost::message_t message)
     363        reboost::shared_buffer_t message)
    328364{
    329    
    330     //// Adapt to old message system ////
    331     // Copy data
    332     size_t bytes_len = message.size();
    333     uint8_t* bytes = new uint8_t[bytes_len];
    334     message.read(bytes, 0, bytes_len);
    335    
    336     Data data(bytes, bytes_len * 8);
    337    
    338     Message legacy_message;
    339     legacy_message.setPayload(data);
    340    
    341    
    342    
    343         /// decapsulate message
    344         AribaBaseMsg* msg = legacy_message.decapsulate<AribaBaseMsg>();
    345         logging_debug( "Receiving message of type " << msg->getTypeString() );
    346 
     365    // XXX
     366    logging_debug("/// [receiveMessage] buffersize: " << message.size());
     367       
     368    // get type
     369    uint8_t type = message.data()[0];
     370    reboost::shared_buffer_t sub_buff = message(1);
     371   
     372    // get link id
     373    LinkID link_id;
     374    if ( type != AribaBaseMsg::typeLinkRequest)
     375    {
     376        sub_buff = link_id.deserialize(sub_buff);
     377    }
     378   
    347379        // handle message
    348         switch (msg->getType()) {
    349 
     380        switch ( type )
     381        {
    350382                // ---------------------------------------------------------------------
    351383                // data message
    352384                // ---------------------------------------------------------------------
    353                 case AribaBaseMsg::typeData: {
    354                         logging_debug( "Received data message, forwarding to overlay" );
    355                         if( messageReceiver != NULL ) {
     385                case AribaBaseMsg::typeData:
     386                {
     387                        logging_debug( "Received data message, forwarding to overlay." );
     388                        if( messageReceiver != NULL )
     389                        {
    356390                                messageReceiver->receiveMessage(
    357                                         msg, msg->getRemoteLink(), NodeID::UNSPECIFIED
     391                                        sub_buff, link_id, NodeID::UNSPECIFIED, false
    358392                                );
    359393                        }
     394                       
    360395                        break;
    361396                }
    362397
     398        // ---------------------------------------------------------------------
     399        // direct data message (bypass overlay-layer)
     400        // ---------------------------------------------------------------------
     401        case AribaBaseMsg::typeDirectData:
     402        {
     403            logging_debug( "Received direct data message, forwarding to application." );
     404           
     405            if( messageReceiver != NULL )
     406            {
     407                messageReceiver->receiveMessage(
     408                    sub_buff, link_id, NodeID::UNSPECIFIED, true
     409                );
     410            }
     411           
     412            break;
     413        }
     414
     415
     416       
    363417                // ---------------------------------------------------------------------
    364418                // handle link request from remote
    365419                // ---------------------------------------------------------------------
    366                 case AribaBaseMsg::typeLinkRequest: {
    367                         logging_debug( "Received link open request" );
    368 
    369                         /// not the correct peer id-> skip request
    370                         if (!msg->getRemoteDescriptor().getPeerId().isUnspecified()
    371                                 && msg->getRemoteDescriptor().getPeerId() != localDescriptor.getPeerId()) {
    372                                 logging_info("Received link request for "
    373                                         << msg->getRemoteDescriptor().getPeerId().toString()
    374                                         << "but i'm "
    375                                         << localDescriptor.getPeerId()
    376                                         << ": Ignoring!");
    377                                 break;
    378                         }
    379 
     420                case AribaBaseMsg::typeLinkRequest:
     421                {
     422                        logging_debug( "Received link open request on "
     423                                << connection->getLocalEndpoint()->to_string() );
     424                       
     425                        /*
     426                         * Deserialize Peer Message
     427                         * - Our PeerID
     428                         */
     429                        PeerID our_peer_id;
     430                        sub_buff = our_peer_id.deserialize(sub_buff);
     431
     432            /// not the correct peer id-> skip request
     433            if ( our_peer_id != localDescriptor.getPeerId() &&
     434                    ! our_peer_id.isUnspecified() /* overlay bootstrap */ )
     435            {
     436                logging_info("Received link request for "
     437                    << our_peer_id.toString()
     438                    << "but i'm "
     439                    << localDescriptor.getPeerId()
     440                    << ": Ignoring!");
     441               
     442                // TODO terminate connection?
     443               
     444                break;
     445            }
     446
     447           
     448                    /*
     449                     * Deserialize Link-Request Message:
     450                     * - Their LinkID
     451                     * - Their PeerID
     452                     * - Their EndpointDescriptor
     453                     */
     454            LinkID their_link_id;
     455            PeerID their_peer_id;
     456            EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
     457            sub_buff = their_link_id.deserialize(sub_buff);
     458            sub_buff = their_peer_id.deserialize(sub_buff);
     459            sub_buff = their_endpoints->deserialize(sub_buff);
     460            /* [ Deserialize Link-Request Message ] */
     461
     462           
    380463                        /// only answer the first request
    381                         if (!queryRemoteLink(msg->getLocalLink()).isUnspecified()) {
     464                        if (!queryRemoteLink(their_link_id).isUnspecified())
     465                        {
     466                           
     467                            // TODO aktuell: When will these connections be closed?
     468                            // ---> Close it now (if it services no links) ?
     469                            //   (see also ! allowlink below)
     470                           
     471                            // XXX AKTUELL TESTING !! This will cause race conditions. So this is test-code only!
     472                            if ( connection->get_communication_links().size() == 0 )
     473                            {
     474                                connection->terminate();
     475                            }
     476                           
    382477                                logging_debug("Link request already received. Ignore!");
    383478                                break;
     
    386481                        /// create link ids
    387482                        LinkID localLink  = LinkID::create();
    388                         LinkID remoteLink = msg->getLocalLink();
     483                        LinkID remoteLink = their_link_id;  // XXX intermediate variable is unnecessary
    389484                        logging_debug(
    390485                                "local=" << connection->getLocalEndpoint()->to_string()
     
    394489                        // check if link creation is allowed by ALL listeners
    395490                        bool allowlink = true;
    396                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
     491                        foreach( CommunicationEvents* i, eventListener ){
    397492                                allowlink &= i->onLinkRequest( localLink,
    398493                                        connection->getLocalEndpoint(),
     
    403498                        if( !allowlink ){
    404499                                logging_warn( "Overlay denied creation of link" );
    405                                 delete msg;
    406500                                return;
    407501                        }
     
    411505                        ld->localLink = localLink;
    412506                        ld->remoteLink = remoteLink;
    413                         ld->localLocator = connection->getLocalEndpoint()->clone();
    414                         ld->remoteLocator = connection->getRemoteEndpoint()->clone();
    415                         ld->connection = connection;
    416                         ld->remoteEndpoint = msg->getLocalDescriptor();
    417                         add_endpoint(ld->remoteLocator);
    418 
    419                         // add layer 1-3 addresses
    420                         ld->remoteEndpoint.getEndpoints().add(
    421                                 ld->remoteLocator, endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
    422                         localDescriptor.getEndpoints().add(
    423                                 connection->getLocalEndpoint(),
    424                                 endpoint_set::Layer1_3 | endpoint_set::NoLoopback);
     507                        ld->localLocator = connection->getLocalEndpoint();
     508                        ld->remoteLocator = connection->getRemoteEndpoint();
     509                        ld->remoteDescriptor = EndpointDescriptor(their_peer_id, their_endpoints);
     510                        ld->set_connection(connection);
     511
     512           
     513                        // update endpoints (should only have any effect in case of NAT)
     514                        ld->remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
     515//                      localDescriptor.endpoints->add_endpoint(connection->getLocalEndpoint());  // XXX 0.0.0.0:0
    425516
    426517                        // link is now up-> add it
     
    428519                        addLink(ld);
    429520
    430                         // link is up!
    431                         logging_debug( "Link (initiated from remote) is up with "
    432                                 << "local(id=" << ld->localLink.toString() << ","
    433                                 << "locator=" << ld->localLocator->to_string() << ") "
    434                                 << "remote(id=" << ld->remoteLink.toString() << ", "
    435                                 << "locator=" << ld->remoteLocator->to_string() << ")"
    436                         );
    437 
    438                         // sending link request reply
    439                         logging_debug( "Sending link request reply with ids "
    440                                 << "local=" << localLink.toString() << ", "
    441                                 << "remote=" << remoteLink.toString() );
    442                         AribaBaseMsg reply( AribaBaseMsg::typeLinkReply, localLink, remoteLink );
    443                         reply.getLocalDescriptor() = localDescriptor;
    444                         reply.getRemoteDescriptor() = ld->remoteEndpoint;
    445 
    446                         send( &reply, *ld );
     521
     522                       
     523            /* sending link reply */
     524            logging_debug( "Sending link reply with ids "
     525                << "local=" << localLink.toString() << ", "
     526                << "remote=" << remoteLink.toString() );
     527
     528                    /*
     529                     * Create Link-Reply Message:
     530                     * - Our LinkID
     531                     * - Our Endpoint_Set (as update)
     532                     * - Their EndpointDescriptor (maybe they learn something about NAT)
     533                     */
     534                    reboost::message_t linkmsg;
     535                    linkmsg.push_back(localLink.serialize());
     536                    linkmsg.push_back(localDescriptor.endpoints->serialize());
     537                    linkmsg.push_back(ld->remoteDescriptor.endpoints->serialize());
     538                   
     539                    // XXX
     540                    cout << "/// MARIO: " << ld->get_connection()->getRemoteEndpoint()->to_string() << endl;
     541                   
     542                    // send message
     543                        bool sent = send_over_link( AribaBaseMsg::typeLinkReply, linkmsg, *ld, system_priority::OVERLAY );
     544                       
     545                        if ( ! sent )
     546                        {
     547                            logging_error("ERROR: Could not send LinkReply to: " << ld->remoteLocator->to_string());
     548                           
     549                            // TODO remove link, close link, ..?
     550                           
     551                            break;
     552                        }
     553
     554
     555            // link is up!
     556            logging_debug( "Link (initiated from remote) is up with "
     557                << "local(id=" << ld->localLink.toString() << ","
     558                << "locator=" << ld->localLocator->to_string() << ") "
     559                << "remote(id=" << ld->remoteLink.toString() << ", "
     560                << "locator=" << ld->remoteLocator->to_string() << ")"
     561            );
    447562
    448563                        // inform listeners about new open link
    449                         BOOST_FOREACH( CommunicationEvents* i, eventListener ) {
     564                        foreach( CommunicationEvents* i, eventListener ) {
    450565                                i->onLinkUp( localLink, ld->localLocator, ld->remoteLocator);
    451566                        }
     
    458573                // handle link request reply
    459574                // ---------------------------------------------------------------------
    460                 case AribaBaseMsg::typeLinkReply: {
     575                case AribaBaseMsg::typeLinkReply:
     576                {
    461577                        logging_debug( "Received link open reply for a link we initiated" );
    462578
     579            /*
     580             * Deserialize Link-Reply Message:
     581             * - Their LinkID
     582             * - Their Endpoint_Set (as update)
     583             * - Our EndpointDescriptor (maybe we can learn something about NAT)
     584             */
     585                        LinkID their_link_id;
     586                        EndpointSetPtr their_endpoints = endpoint_set::create_EndpointSet();
     587                        EndpointSetPtr our_endpoints = endpoint_set::create_EndpointSet();
     588                        sub_buff = their_link_id.deserialize(sub_buff);
     589                        sub_buff = their_endpoints->deserialize(sub_buff);
     590                        sub_buff = our_endpoints->deserialize(sub_buff);
     591
     592           
    463593                        // this is a reply to a link open request, so we have already
    464594                        // a link mapping and can now set the remote link to valid
    465                         LinkDescriptor& ld = queryLocalLink( msg->getRemoteLink() );
     595                        LinkDescriptor& ld = queryLocalLink( link_id );
    466596
    467597                        // no link found-> warn!
    468598                        if (ld.isUnspecified()) {
    469                                 logging_warn("Failed to find local link " << msg->getRemoteLink().toString());
    470                                 delete msg;
     599                                logging_warn("Failed to find local link " << link_id.toString());
    471600                                return;
    472601                        }
     602                       
     603                        if ( ld.up )
     604                        {
     605                logging_warn("Got link replay for already open link. Ignore. LinkID: " << link_id.toString());
     606               
     607                // TODO send LinkClose ?
     608                return;                     
     609                        }
    473610
    474611                        // store the connection
    475                         ld.connection = connection;
     612                        ld.set_connection(connection);
    476613                       
    477614                        // set remote locator and link id
    478                         ld.remoteLink = msg->getLocalLink();
    479                         ld.remoteLocator = connection->getRemoteEndpoint()->clone();
    480                         ld.remoteEndpoint.getEndpoints().add(
    481                                                         msg->getLocalDescriptor().getEndpoints(),
    482                                                         endpoint_set::Layer1_4
    483                                                 );
    484 
    485                         localDescriptor.getEndpoints().add(
    486                                 msg->getRemoteDescriptor().getEndpoints(),
    487                                 endpoint_set::Layer1_3
    488                         );
     615                        ld.remoteLink = their_link_id;
     616                        ld.remoteLocator = connection->getRemoteEndpoint();
     617                       
     618                       
     619                        /* Update endpoints */
     620                        // NOTE: we might loose some information here, but it's our only chance to get rid of outdated information.
     621                        ld.remoteDescriptor.replace_endpoint_set(their_endpoints);
     622                       
     623                        // add actual remote endpoint to this set (should only have any effect in case of NAT)
     624                        ld.remoteDescriptor.endpoints->add_endpoint(connection->getRemoteEndpoint());
     625                       
     626                        // TODO In case of NAT, we could learn something about our external IP.
     627                        //   ---> But we must trust the remote peer about this information!! 
     628//                      localDescriptor.endpoints->add_endpoints(our_endpoints);
     629                       
     630                       
     631                       
     632                       
    489633                        ld.up = true;
    490                         add_endpoint(ld.remoteLocator);
    491634
    492635                        logging_debug( "Link is now up with local id "
     
    496639
    497640                        // inform lisneters about link up event
    498                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
     641                        foreach( CommunicationEvents* i, eventListener ){
    499642                                i->onLinkUp( ld.localLink, ld.localLocator, ld.remoteLocator );
    500643                        }
     
    509652                case AribaBaseMsg::typeLinkClose: {
    510653                        // get remote link
    511                         const LinkID& localLink = msg->getRemoteLink();
    512                         logging_debug( "Received link close request for link " << localLink.toString() );
     654//                      const LinkID& localLink = msg.getRemoteLink();
     655                        logging_debug( "Received link close request for link " << link_id.toString() );
    513656
    514657                        // searching for link, not found-> warn
    515                         LinkDescriptor& linkDesc = queryLocalLink( localLink );
     658                        LinkDescriptor& linkDesc = queryLocalLink( link_id );
    516659                        if (linkDesc.isUnspecified()) {
    517                                 logging_warn("Failed to find local link " << localLink.toString());
    518                                 delete msg;
     660                                logging_warn("Failed to find local link " << link_id.toString());
    519661                                return;
    520662                        }
    521663
    522664                        // inform listeners
    523                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
     665                        foreach( CommunicationEvents* i, eventListener ){
    524666                                i->onLinkDown( linkDesc.localLink,
    525667                                                linkDesc.localLocator, linkDesc.remoteLocator );
     
    527669
    528670                        // remove the link descriptor
    529                         removeLink( localLink );
     671                        removeLink( link_id );
    530672
    531673                        // done
     
    534676
    535677                // ---------------------------------------------------------------------
    536                 // handle link locator changes
     678                // handle link locator changes  -- TODO is this ever called..?
    537679                // ---------------------------------------------------------------------
    538                 case AribaBaseMsg::typeLinkUpdate: {
    539                         const LinkID& localLink = msg->getRemoteLink();
    540                         logging_debug( "Received link update for link "
    541                                 << localLink.toString() );
    542 
    543                         // find the link description
    544                         LinkDescriptor& linkDesc = queryLocalLink( localLink );
    545                         if (linkDesc.isUnspecified()) {
    546                                 logging_warn("Failed to update local link "
    547                                         << localLink.toString());
    548                                 delete msg;
    549                                 return;
    550                         }
    551 
    552                         // update the remote locator
    553                         const address_v* oldremote = linkDesc.remoteLocator;
    554                         linkDesc.remoteLocator = connection->getRemoteEndpoint()->clone();
    555 
    556                         // inform the listeners (local link has _not_ changed!)
    557                         BOOST_FOREACH( CommunicationEvents* i, eventListener ){
    558                                 i->onLinkChanged(
    559                                         linkDesc.localLink,     // linkid
    560                                         linkDesc.localLocator,  // old local
    561                                         linkDesc.localLocator,  // new local
    562                                         oldremote,              // old remote
    563                                         linkDesc.remoteLocator  // new remote
    564                                 );
    565                         }
    566 
    567                         // done
    568                         break;
    569                 }
    570         }
    571 
    572         delete msg;
     680//              case AribaBaseMsg::typeLinkUpdate: {
     681//                      const LinkID& localLink = msg.getRemoteLink();
     682//                      logging_debug( "Received link update for link "
     683//                              << localLink.toString() );
     684//
     685//                      // find the link description
     686//                      LinkDescriptor& linkDesc = queryLocalLink( localLink );
     687//                      if (linkDesc.isUnspecified()) {
     688//                              logging_warn("Failed to update local link "
     689//                                      << localLink.toString());
     690//                              return;
     691//                      }
     692//
     693//                      // update the remote locator
     694//                      addressing2::EndpointPtr oldremote = linkDesc.remoteLocator;
     695//                      linkDesc.remoteLocator = connection->getRemoteEndpoint();
     696//                     
     697//                      // TODO update linkDesc.connection ?
     698//
     699//                      // inform the listeners (local link has _not_ changed!)
     700//                      foreach( CommunicationEvents* i, eventListener ){
     701//                              i->onLinkChanged(
     702//                                      linkDesc.localLink,     // linkid
     703//                                      linkDesc.localLocator,  // old local
     704//                                      linkDesc.localLocator,  // new local
     705//                                      oldremote,                      // old remote
     706//                                      linkDesc.remoteLocator  // new remote
     707//                              );
     708//                      }
     709//
     710//                      // done
     711//                      break;
     712//              }
     713               
     714               
     715        default: {
     716            logging_warn( "Received unknown message type!" );
     717            break;
     718        }
     719
     720        }
    573721}
    574722
     
    582730        for(LinkSet::iterator i=linkSet.begin(); i != linkSet.end(); i++){
    583731                if( (*i)->localLink != localLink) continue;
    584                 remove_endpoint((*i)->remoteLocator);
     732//              remove_endpoint((*i)->remoteLocator);  // XXX
    585733                delete *i;
    586734                linkSet.erase( i );
     
    605753}
    606754
    607 LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
    608         LinkIDs ids;
    609         for (size_t i=0; i<linkSet.size(); i++){
    610                 if( addr == NULL ){
    611                         ids.push_back( linkSet[i]->localLink );
    612                 } else {
    613                         if ( *linkSet[i]->remoteLocator == *addr )
    614                                 ids.push_back( linkSet[i]->localLink );
    615                 }
    616         }
    617         return ids;
    618 }
     755//LinkIDs BaseCommunication::getLocalLinks( const address_v* addr ) const {
     756//      LinkIDs ids;
     757//      for (size_t i=0; i<linkSet.size(); i++){
     758//              if( addr == NULL ){
     759//                      ids.push_back( linkSet[i]->localLink );
     760//              } else {
     761//                      if ( *linkSet[i]->remoteLocator == *addr )
     762//                              ids.push_back( linkSet[i]->localLink );
     763//              }
     764//      }
     765//      return ids;
     766//}
    619767
    620768void BaseCommunication::onNetworkChange(const NetworkChangeInterface::NetworkChangeInfo& info){
    621 
    622 #ifdef UNDERLAY_OMNET
    623 
    624         // we have no mobility support for simulations
    625         return
    626 
    627 #endif // UNDERLAY_OMNET
    628769
    629770/*- disabled!
     
    762903}
    763904
    764 /// sends a message to all end-points in the end-point descriptor
    765 void BaseCommunication::send(Message* legacy_message, const EndpointDescriptor& endpoint) {
    766         Data data = data_serialize(legacy_message, DEFAULT_V);
    767        
    768         //// Adapt to new message system ////
    769         // transfer data buffer ownership to the shared_buffer
    770     reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
    771        
    772         reboost::message_t message;
    773         message.push_back(buf);
    774        
    775         transport->send(endpoint.getEndpoints(), message);
    776 }
    777 
    778 /// sends a message to the remote locator inside the link descriptor
    779 void BaseCommunication::send(Message* legacy_message, const LinkDescriptor& desc) {
    780         if (desc.remoteLocator==NULL) return;
    781        
    782         Data data = data_serialize(legacy_message, DEFAULT_V);
    783    
    784     //// Adapt to new message system ////
    785     // transfer data buffer ownership to the shared_buffer
    786     reboost::shared_buffer_t buf(data.getBuffer(), data.getLength() / 8);
    787    
    788     reboost::message_t message;
    789     message.push_back(buf);
    790    
    791         desc.connection->send(message);
    792 }
     905
     906addressing2::EndpointPtr BaseCommunication::get_local_endpoint_of_link(
     907        const LinkID& linkid)
     908{
     909    LinkDescriptor& ld = queryLocalLink(linkid);
     910   
     911    return ld.get_connection()->getLocalEndpoint();
     912}
     913
     914addressing2::EndpointPtr BaseCommunication::get_remote_endpoint_of_link(
     915        const LinkID& linkid)
     916{
     917    LinkDescriptor& ld = queryLocalLink(linkid);
     918   
     919    return ld.get_connection()->getRemoteEndpoint();
     920}
     921
     922
     923
     924bool BaseCommunication::send_over_link(
     925        const uint8_t type,
     926        reboost::message_t message,
     927        const LinkDescriptor& desc,
     928        const uint8_t priority)
     929{
     930    /*
     931     * Create Link Message:
     932     * - Type
     933     * - Their LinkID
     934     */
     935    // link id
     936    message.push_front(desc.remoteLink.serialize());
     937    // type
     938    memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
     939    /* [ Create Link Message ] */
     940   
     941   
     942    /* send message */
     943    transport_connection::sptr conn = desc.get_connection();
     944    if ( ! conn )
     945    {
     946        cout << "/// MARIO: No connection!!" << endl;  // XXX debug
     947        return false;
     948    }
     949   
     950    // * send over connection *
     951    return conn->send(message, priority);
     952}
     953
     954void BaseCommunication::send_to_peer(
     955        const uint8_t type,
     956        const PeerID& peer_id,
     957        reboost::message_t message,
     958        const EndpointDescriptor& endpoint,
     959        const uint8_t priority )
     960{
     961    /*
     962     * Create Peer Message:
     963     * - Type
     964     * - Their PeerID
     965     */
     966    // peer id
     967    message.push_front(peer_id.serialize());
     968    // type
     969    memcpy(message.push_front(sizeof(uint8_t)).mutable_data(), &type, sizeof(uint8_t));
     970   
     971   
     972    /* send message */
     973    transport->send(endpoint.getEndpoints(), message, priority);
     974}
     975
     976
     977
    793978
    794979}} // namespace ariba, communication
  • source/ariba/communication/BaseCommunication.h

    r10653 r12060  
    5050#include <boost/foreach.hpp>
    5151
     52#ifdef ECLIPSE_PARSER
     53    #define foreach(a, b) for(a : b)
     54#else
     55    #define foreach(a, b) BOOST_FOREACH(a, b)
     56#endif
     57
    5258// utilities
    5359#include "ariba/utility/types.h"
    54 #include "ariba/utility/messages.h"
     60#include "ariba/utility/messages/MessageReceiver.h"
    5561#include "ariba/utility/logging/Logging.h"
    5662#include "ariba/utility/misc/Demultiplexer.hpp"
     
    5864
    5965// new transport and addressing
    60 #include "ariba/utility/addressing/addressing.hpp"
    61 #include "ariba/utility/transport/transport.hpp"
    62 #include "ariba/utility/transport/transport_connection.hpp"
     66#include "ariba/utility/transport/transport_peer.hpp"
     67#include "ariba/utility/transport/interfaces/transport_connection.hpp"
     68#include "ariba/utility/transport/interfaces/transport_listener.hpp"
     69#include "ariba/utility/addressing2/endpoint.hpp"
    6370
    6471// communication
     
    7279#include "ariba/communication/networkinfo/NetworkInformation.h"
    7380
    74 // disabled
    75 //#ifndef UNDERLAY_OMNET
    76 //  #include "ariba/communication/modules/transport/tcp/TCPTransport.h"
    77 //  #include "ariba/communication/modules/network/ip/IPv4NetworkProtocol.h"
    78 //  using ariba::communication::IPv4NetworkProtocol;
    79 //  using ariba::communication::TCPTransport;
    80 //#endif
    81 
    8281namespace ariba {
    83   class SideportListener;
     82    class SideportListener;
    8483}
    8584
     
    8786namespace communication {
    8887
     88
     89class communication_message_not_sent: public std::runtime_error
     90{
     91public:
     92    /** Takes a character string describing the error.  */
     93    explicit communication_message_not_sent(const string& __arg)  :
     94        std::runtime_error(__arg)
     95    {
     96    }
     97   
     98    virtual ~communication_message_not_sent() throw() {}
     99};
     100
     101
     102
    89103using namespace std;
    90 using namespace ariba::addressing;
    91104using namespace ariba::transport;
    92105using namespace ariba::utility;
     
    102115 * protocols and addressing schemes.
    103116 *
    104  * @author Sebastian Mies, Christoph Mayer
     117 * @author Sebastian Mies, Christoph Mayer, Mario Hock
    105118 */
    106119class BaseCommunication:
    107120        public NetworkChangeInterface,
    108         public SystemEventListener,
    109121        public transport_listener {
    110122
     
    120132
    121133        /// Startup the base communication, start modules etc.
    122         void start();
     134        void start(addressing2::EndpointSetPtr listen_on);
    123135
    124136        /// Stops the base communication, stop modules etc.
    125137        void stop();
    126 
    127         /// Sets the endpoints
    128         void setEndpoints( string& endpoints );
    129138
    130139        /// Check whether the base communication has been started up
     
    147156         * @return A sequence number for this message
    148157         */
    149         seqnum_t sendMessage(const LinkID lid, const Message* message);
     158        seqnum_t sendMessage(const LinkID& lid,
     159                reboost::message_t message,
     160                uint8_t priority,
     161                bool bypass_overlay = false) throw(communication_message_not_sent);
    150162
    151163        /**
     
    164176         * @return List of LinkID
    165177         */
    166         LinkIDs getLocalLinks(const address_v* addr) const;
     178//      LinkIDs getLocalLinks(const address_v* addr) const;  // XXX aktuell
    167179
    168180        /**
     
    187199
    188200        void unregisterEventListener(CommunicationEvents* _events);
    189 
    190         /// called when a system event is emitted by system queue
    191         virtual void handleSystemEvent(const SystemEvent& event);
    192201
    193202        /**
     
    196205         */
    197206        virtual void receive_message(transport_connection::sptr connection,
    198                 reboost::message_t msg);
    199 
     207                reboost::shared_buffer_t msg);
     208
     209    /**
     210     * called within the ASIO thread
     211     * when a connection is terminated (e.g. TCP close)
     212     */
     213    virtual void connection_terminated(transport_connection::sptr connection);
     214
     215    addressing2::EndpointPtr get_local_endpoint_of_link(const LinkID& linkid);
     216    addressing2::EndpointPtr get_remote_endpoint_of_link(const LinkID& linkid);
     217
     218       
    200219protected:
    201220
     
    205224         */
    206225        void receiveMessage(transport_connection::sptr connection,
    207                 reboost::message_t msg);
     226                reboost::shared_buffer_t message);
     227       
     228    /**
     229     * called within the ARIBA thread (System Queue)
     230     * when a connection is terminated (e.g. TCP close)
     231     */
     232    void connectionTerminated(transport_connection::sptr connection);
     233       
    208234
    209235        /// called when a network interface change happens
     
    221247                /// default constructor
    222248                LinkDescriptor() :
    223                         localLink(LinkID::UNSPECIFIED), localLocator(NULL),
    224                         remoteLink(LinkID::UNSPECIFIED), remoteLocator(NULL),
     249                        localLink(LinkID::UNSPECIFIED),
     250                        remoteLink(LinkID::UNSPECIFIED),
    225251                        up(false) {
    226252                }
    227253
    228                 ~LinkDescriptor() {
    229                         if (localLocator!=NULL)  delete localLocator;
    230                         if (remoteLocator!=NULL) delete remoteLocator;
     254                ~LinkDescriptor()
     255                {
     256                        if ( connection )
     257                        {
     258                            connection->unregister_communication_link(&localLink);
     259                        }
    231260                }
    232261
     
    240269                        return *unspec;
    241270                }
     271               
     272               
     273                transport_connection::sptr get_connection() const
     274                {
     275                    return connection;
     276                }
     277               
     278                void set_connection(const transport_connection::sptr& conn)
     279                {
     280                    // unregister from old connection,
     281                    // if any (but normally there shouldn't..)
     282                    if ( connection )
     283                    {
     284                        connection->unregister_communication_link(&localLink);
     285                    }
     286
     287                    // * set_connection *
     288                    connection = conn;
     289                   
     290                    // register this link with the connection
     291                    conn->register_communication_link(&localLink);
     292                }
    242293
    243294                bool unspecified;
     
    245296                /// link identifiers
    246297                LinkID localLink;
    247                 const address_v* localLocator;
     298                addressing2::EndpointPtr localLocator;
    248299
    249300                /// used underlay addresses for the link
    250301                LinkID remoteLink;
    251                 const address_v* remoteLocator;
     302                addressing2::EndpointPtr remoteLocator;
    252303
    253304                /// the remote end-point descriptor
    254                 EndpointDescriptor remoteEndpoint;
     305                EndpointDescriptor remoteDescriptor;
    255306
    256307                /// flag, whether this link is up
    257308                bool up;
    258309               
     310               
     311        private:
    259312                /// connection if link is up
    260313                transport_connection::sptr connection;
     
    281334        /// The local end-point descriptor
    282335        EndpointDescriptor localDescriptor;
    283 
    284 #ifndef UNDERLAY_OMNET
     336       
     337        /**
     338         * endpoint_set holding the addresses of the "server"-sockets,
     339         * ---> that should be opened
     340         *
     341         * (e.g. 0.0.0.0:41322)
     342         */
     343        addressing2::EndpointSetPtr listenOn_endpoints;
     344       
     345    /**
     346     * endpoint_set holding the addresses of the "server"-sockets,
     347     * ---> that are actually open
     348     *
     349     * (e.g. 0.0.0.0:41322)
     350     *
     351     * XXX should only be in transport_peer
     352     */
     353        addressing2::EndpointSetPtr active_listenOn_endpoints;
     354       
     355    /**
     356     * endpoint_set holding the addresses of the "server"-sockets,
     357     * ---> here the discovered "addressable" addresses are stored
     358     *
     359     * (e.g. 192.168.0.5:41322)
     360     *
     361     * XXX should only be in localDescriptor
     362     */
     363        addressing2::EndpointSetPtr local_endpoints;
     364
    285365        /// network change detector
    286366        NetworkChangeDetection networkMonitor;
    287 #endif
    288367
    289368        /// list of all remote addresses of links to end-points
    290         class endpoint_reference {
    291         public:
    292                 int count; ///< the number of open links to this end-point
    293                 const address_v* endpoint; ///< the end-point itself
    294         };
    295         vector<endpoint_reference> remote_endpoints;
    296 
    297         /// adds an end-point to the list
    298         void add_endpoint( const address_v* endpoint );
    299 
    300         /// removes an end-point from the list
    301         void remove_endpoint( const address_v* endpoint );
     369        // XXX DEPRECATED
     370//      class endpoint_reference {
     371//      public:
     372//              int count; ///< the number of open links to this end-point
     373//              const address_v* endpoint; ///< the end-point itself
     374//      };
     375//      vector<endpoint_reference> remote_endpoints;
     376
     377        // XXX DEPRECATED
     378//      /// adds an end-point to the list
     379//      void add_endpoint( const address_v* endpoint );
     380//
     381//      /// removes an end-point from the list
     382//      void remove_endpoint( const address_v* endpoint );
    302383
    303384        /// event listener
     
    314395        MessageReceiver* messageReceiver;
    315396
    316         /// convenience: send message to peer
    317         void send( Message* message, const EndpointDescriptor& endpoint );
    318         void send( Message* message, const LinkDescriptor& descriptor );
     397       
     398        /*
     399         * Sends a message over an existing link.
     400         *   ---> Adds »Link Message« Header
     401         */
     402        bool send_over_link(
     403                const uint8_t type,
     404                reboost::message_t message,
     405                const LinkDescriptor& desc,
     406                const uint8_t priority);
     407
     408    /*
     409     * Sends a message to a known peer. (To all known endpoints.)
     410     *   ---> Adds »Peer Message« Header
     411     */
     412        void send_to_peer(
     413                const uint8_t type,
     414                const PeerID& peer_id,
     415                reboost::message_t message,
     416                const EndpointDescriptor& endpoint,
     417                const uint8_t priority );
     418       
    319419
    320420        /// state of the base communication
  • source/ariba/communication/CommunicationEvents.cpp

    r5284 r12060  
    4949
    5050bool CommunicationEvents::onLinkRequest(const LinkID& id,
    51         const address_v* local, const address_v* remote) {
     51        const addressing2::EndpointPtr local,
     52        const addressing2::EndpointPtr remote)
     53{
    5254        return true;
    5355}
    5456
    55 void CommunicationEvents::onLinkUp(const LinkID& id, const address_v* local,
    56         const address_v* remote) {
     57void CommunicationEvents::onLinkUp(const LinkID& id,
     58        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     59{
    5760}
    5861
    59 void CommunicationEvents::onLinkDown(const LinkID& id, const address_v* local,
    60         const address_v* remote) {
     62void CommunicationEvents::onLinkDown(const LinkID& id,
     63        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     64{
    6165}
    6266
    6367void CommunicationEvents::onLinkChanged(const LinkID& id,
    64         const address_v* oldlocal, const address_v* newlocal,
    65         const address_v* oldremote, const address_v* newremote) {
     68        const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
     69        const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote)
     70{
    6671}
    6772
    68 void CommunicationEvents::onLinkFail(const LinkID& id, const address_v* local,
    69         const address_v* remote) {
     73void CommunicationEvents::onLinkFail(const LinkID& id,
     74        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote)
     75{
    7076}
    7177
    7278void CommunicationEvents::onLinkQoSChanged(const LinkID& id,
    73         const address_v* local, const address_v* remote, const QoSParameterSet& qos) {
     79        const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
     80        const QoSParameterSet& qos)
     81{
    7482}
    7583
  • source/ariba/communication/CommunicationEvents.h

    r5284 r12060  
    4242#include "ariba/utility/types/LinkID.h"
    4343#include "ariba/utility/types/QoSParameterSet.h"
    44 #include "ariba/utility/addressing/addressing.hpp"
     44#include "ariba/utility/addressing2/endpoint.hpp"
    4545
    4646namespace ariba {
     
    4949using ariba::utility::LinkID;
    5050using ariba::utility::QoSParameterSet;
    51 using namespace ariba::addressing;
    5251
    5352class CommunicationEvents {
     
    6867         * @return True, if the link should be established
    6968         */
    70         virtual bool onLinkRequest(const LinkID& id, const address_v* local,
    71                 const address_v* remote);
     69        virtual bool onLinkRequest(const LinkID& id,
     70                const addressing2::EndpointPtr local,
     71                const addressing2::EndpointPtr remote);
    7272
    7373        /**
     
    7777         * @param id The link id of the established link
    7878         */
    79         virtual void onLinkUp(const LinkID& id, const address_v* local,
    80                 const address_v* remote);
     79        virtual void onLinkUp(const LinkID& id,
     80                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
    8181
    8282        /**
     
    8585         * @param id The link identifier of the dropped link
    8686         */
    87         virtual void onLinkDown(const LinkID& id, const address_v* local,
    88                 const address_v* remote);
     87        virtual void onLinkDown(const LinkID& id,
     88                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
    8989
    9090        /**
     
    9797         */
    9898        virtual void onLinkChanged(const LinkID& id,
    99                 const address_v* oldlocal,  const address_v* newlocal,
    100                 const address_v* oldremote, const address_v* newremote
    101         );
     99                const addressing2::EndpointPtr oldlocal,  const addressing2::EndpointPtr newlocal,
     100                const addressing2::EndpointPtr oldremote, const addressing2::EndpointPtr newremote);
    102101
    103         virtual void onLinkFail(const LinkID& id, const address_v* local,
    104                 const address_v* remote);
     102        virtual void onLinkFail(const LinkID& id,
     103                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote);
    105104
    106         virtual void onLinkQoSChanged(const LinkID& id, const address_v* local,
    107                 const address_v* remote, const QoSParameterSet& qos);
     105        virtual void onLinkQoSChanged(const LinkID& id,
     106                const addressing2::EndpointPtr local, const addressing2::EndpointPtr remote,
     107                const QoSParameterSet& qos);
    108108};
    109109
  • source/ariba/communication/EndpointDescriptor.cpp

    r5624 r12060  
    4242namespace communication {
    4343
    44 vsznDefault(EndpointDescriptor);
     44//vsznDefault(EndpointDescriptor);
    4545
    46 EndpointDescriptor::EndpointDescriptor(){
     46EndpointDescriptor::EndpointDescriptor()  :
     47        endpoints(new addressing2::endpoint_set())
     48{
    4749}
    4850
     
    5557}
    5658
    57 EndpointDescriptor::EndpointDescriptor(const endpoint_set& endpoints ) :
    58         endpoints(endpoints){
     59EndpointDescriptor::EndpointDescriptor(
     60        const PeerID& peer_id,
     61        addressing2::EndpointSetPtr endpoints ) :
     62    peerId(peer_id),
     63        endpoints(endpoints)
     64{
    5965}
    6066
    61 EndpointDescriptor::EndpointDescriptor(const string& str) : endpoints(str){
     67EndpointDescriptor::EndpointDescriptor(const string& str)  :
     68        endpoints(new addressing2::endpoint_set())
     69{
     70    cout << "ERROR: Construction of EndpointDescriptor from String is not functional!!" << endl;
     71    assert( false );
    6272}
    6373
     74
     75reboost::message_t EndpointDescriptor::serialize() const
     76{
     77    reboost::message_t msg;
     78    msg.push_back(peerId.serialize());
     79    msg.push_back(endpoints->serialize());
     80   
     81    return msg;
     82}
     83
     84reboost::shared_buffer_t EndpointDescriptor::deserialize(reboost::shared_buffer_t buff)
     85{
     86    buff = peerId.deserialize(buff);
     87    buff = endpoints->deserialize(buff);
     88   
     89    return buff;
     90}
     91
     92
    6493}} // namespace ariba, communication
  • source/ariba/communication/EndpointDescriptor.h

    r7744 r12060  
    4242#include <string>
    4343#include <set>
    44 #include "ariba/utility/serialization.h"
     44//#include "ariba/utility/serialization.h"
    4545#include "ariba/utility/types/PeerID.h"
    46 #include "ariba/utility/addressing/endpoint_set.hpp"
     46
     47#include "ariba/utility/addressing2/endpoint_set.hpp"
     48
     49// reboost messages
     50#include "ariba/utility/transport/messages/message.hpp"
     51
    4752
    4853namespace ariba {
     
    5156using_serialization;
    5257using namespace std;
    53 using namespace ariba::addressing;
    5458using ariba::utility::PeerID;
    5559
    56 class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE
    57         friend class BaseCommunication;
     60
     61/**
     62 * This class is used a transitions helper between the old addressing and
     63 * serialization to the new addressing2 and the new message classes
     64 *
     65 * Maybe it will be replaced, or at least modified in the future.
     66 */
     67//class EndpointDescriptor: public VSerializeable { VSERIALIZEABLE
     68//    friend class BaseCommunication;
     69class EndpointDescriptor
     70{
     71    friend class BaseCommunication;
    5872
    5973public:
     
    6882
    6983        /// construct end-points from an endpoint set
    70         EndpointDescriptor(const endpoint_set& endpoints );
     84        EndpointDescriptor(const PeerID& peer_id, addressing2::EndpointSetPtr endpoints );
    7185
     86        // FIXME NOT WORKING !!
    7287        /// construct end-points from a string
    7388        EndpointDescriptor(const string& str);
     
    7590        /// convert end-points to string
    7691        string toString() const {
    77                 return endpoints.to_string();
     92                return endpoints->to_string();
    7893        }
    7994
     
    90105        }
    91106
    92         /// create endpoint
    93         static EndpointDescriptor* fromString(string str) {
    94                 return new EndpointDescriptor(str);
    95         }
     107//      /// create endpoint
     108//      static EndpointDescriptor* fromString(string str) {
     109//              return new EndpointDescriptor(str);
     110//      }
    96111
    97112        bool operator==(const EndpointDescriptor& rh) const {
     
    113128
    114129        /// returns the end-points of this descriptor
    115         endpoint_set& getEndpoints() {
     130        addressing2::const_EndpointSetPtr getEndpoints() const {
    116131                return endpoints;
    117132        }
    118 
    119         /// returns the end-points of this descriptor
    120         const endpoint_set& getEndpoints() const {
    121                 return endpoints;
     133       
     134        void replace_endpoint_set(addressing2::EndpointSetPtr new_endpoints)
     135        {
     136            endpoints = new_endpoints;
    122137        }
    123 
     138       
    124139        /// returns a reference to the peer id
    125140        PeerID& getPeerId() {
     
    132147                return peerId;
    133148        }
     149       
     150        /// returns a message with peerId and endpoints in it
     151        reboost::message_t serialize() const;
     152       
     153        /// deserialite peerId and endpoints
     154        reboost::shared_buffer_t deserialize(reboost::shared_buffer_t buff);
     155       
    134156private:
    135         endpoint_set endpoints;
     157        addressing2::EndpointSetPtr endpoints;
    136158        PeerID peerId;
    137159};
     
    139161}} // namespace ariba, communication
    140162
    141 sznBeginDefault( ariba::communication::EndpointDescriptor, X ){
    142 
    143         // serialize peer id
    144         X && &peerId;
    145 
    146         // serialize end-points
    147         uint16_t len = endpoints.to_bytes_size();
    148         X && len;
    149         uint8_t* buffer = X.bytes( len );
    150         if (buffer!=NULL) {
    151                 if (X.isDeserializer()) endpoints.assign(buffer,len);
    152                 else endpoints.to_bytes(buffer);
    153         }
    154 }sznEnd();
     163//sznBeginDefault( ariba::communication::EndpointDescriptor, X ){
     164//
     165//    // TODO
     166//    assert(false);
     167//   
     168//      // serialize peer id
     169//      X && &peerId;
     170//
     171//      // serialize end-points
     172//      uint16_t len = endpoints.to_bytes_size();
     173//      X && len;
     174//      uint8_t* buffer = X.bytes( len );
     175//      if (buffer!=NULL) {
     176//              if (X.isDeserializer()) endpoints.assign(buffer,len);
     177//              else endpoints.to_bytes(buffer);
     178//      }
     179//}sznEnd();
    155180
    156181#endif /*ENDPOINTDESCRIPTOR_H_*/
  • source/ariba/communication/messages/AribaBaseMsg.h

    r9694 r12060  
    4242#include <string>
    4343#include <boost/cstdint.hpp>
    44 #include "ariba/utility/messages.h"
     44//#include "ariba/utility/messages.h"
     45#include "ariba/utility/messages/Message.h"
    4546#include "ariba/utility/serialization.h"
    4647#include "ariba/utility/types/LinkID.h"
     
    6162using_serialization;
    6263
     64// XXX This whole message is DEPRECATED
    6365class AribaBaseMsg : public Message {
    6466        VSERIALIZEABLE;
     
    6971                typeLinkReply = 2,
    7072                typeLinkClose = 3,
    71                 typeLinkUpdate = 4
     73                typeLinkUpdate = 4,
     74                typeDirectData = 5
    7275        };
    7376
     
    115118
    116119sznBeginDefault( ariba::communication::AribaBaseMsg, X ) {
    117         X && type && &localLink && &remoteLink;
     120        X && type && &remoteLink;
    118121        if (type == typeLinkReply || type == typeLinkRequest)
    119                 X && localDescriptor && remoteDescriptor;
    120         X && Payload();
     122                X && &localLink && localDescriptor && remoteDescriptor;
     123//      X && Payload();
    121124} sznEnd();
    122125
  • source/ariba/communication/messages/CMakeLists.txt

    r10700 r12060  
    3737# [License]
    3838
    39 add_headers(AribaBaseMsg.h)
     39#add_headers(AribaBaseMsg.h)
    4040
    41 add_sources(AribaBaseMsg.cpp)
     41#add_sources(AribaBaseMsg.cpp)
  • source/ariba/communication/networkinfo/AddressDiscovery.cpp

    r10700 r12060  
    4949#include <ifaddrs.h>
    5050
     51#include <string>
     52#include <boost/asio/ip/address.hpp>
     53#include <boost/foreach.hpp>
     54
     55#include "ariba/utility/addressing2/tcpip_endpoint.hpp"
     56#include "ariba/utility/addressing/mac_address.hpp"
     57
    5158#ifdef HAVE_LIBBLUETOOTH
    5259  #include <bluetooth/bluetooth.h>
     
    5865namespace communication {
    5966
    60 mac_address AddressDiscovery::getMacFromIF( const char* name ) {
     67
     68using namespace std;
     69using namespace addressing2;
     70using namespace boost::asio::ip;
     71
     72using ariba::addressing::mac_address;
     73
     74mac_address getMacFromIF( const char* name )
     75{
    6176        mac_address addr;
    6277#ifdef HAVE_LIBBLUETOOTH
     
    7388}
    7489
    75 int AddressDiscovery::dev_info(int s, int dev_id, long arg) {
    76 #ifdef HAVE_LIBBLUETOOTH
    77         endpoint_set* set = (endpoint_set*)arg;
    78         struct hci_dev_info di;
    79         memset(&di, 0, sizeof(struct hci_dev_info));
    80         di.dev_id = dev_id;
    81         if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0;
    82         mac_address mac;
    83         mac.bluetooth( di.bdaddr );
    84         address_vf vf = mac;
    85         set->add(vf);
     90int dev_info(int s, int dev_id, long arg)
     91{
     92#ifdef HAVE_LIBBLUETOOTH
     93//      endpoint_set* set = (endpoint_set*)arg;
     94//      struct hci_dev_info di;
     95//      memset(&di, 0, sizeof(struct hci_dev_info));
     96//      di.dev_id = dev_id;
     97//      if (ioctl(s, HCIGETDEVINFO, (void *) &di)) return 0;
     98//      mac_address mac;
     99//      mac.bluetooth( di.bdaddr );
     100//      address_vf vf = mac;
     101//      set->add(vf);
    86102#endif
    87103        return 0;
    88104}
    89105
    90 void AddressDiscovery::discover_bluetooth( endpoint_set& endpoints ) {
    91 #ifdef HAVE_LIBBLUETOOTH
    92         hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints );
    93 #endif
    94 }
    95 
    96 void AddressDiscovery::discover_ip_addresses( endpoint_set& endpoints ) {
    97         struct ifaddrs* ifaceBuffer = NULL;
    98         void*           tmpAddrPtr  = NULL;
    99 
    100         int ret = getifaddrs( &ifaceBuffer );
    101         if( ret != 0 ) return;
    102 
    103         for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next ) {
    104 
    105                 // ignore devices that are disabled or have no ip
    106                 if(i == NULL) continue;
    107                 struct sockaddr* addr = i->ifa_addr;
    108                 if (addr==NULL) continue;
    109 
    110                 // ignore tun devices
    111                 string device = string(i->ifa_name);
    112                 if(device.find_first_of("tun") == 0) continue;
    113 
    114                 if (addr->sa_family == AF_INET) {
    115                         // look for ipv4
    116                         char straddr[INET_ADDRSTRLEN];
    117                         tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr;
    118                         inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
    119                         ip_address ip = straddr;
    120                         if (ip.is_loopback()) continue;
    121                         address_vf vf = ip;
    122                         endpoints.add( vf );
    123                 } else
    124                 if (addr->sa_family == AF_INET6) {
    125                         // look for ipv6
    126                         char straddr[INET6_ADDRSTRLEN];
    127                         tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
    128                         inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
    129                         ip_address ip = straddr;
    130                         if (ip.is_loopback()) continue;
    131 //                      if (ip.is_link_local()) continue;
    132                         address_vf vf = ip;
    133                         endpoints.add( vf );
    134                 }
    135         }
    136 
    137         freeifaddrs(ifaceBuffer);
    138 }
    139 
    140 void AddressDiscovery::discover_endpoints( endpoint_set& endpoints ) {
    141         discover_ip_addresses( endpoints );
    142         discover_bluetooth( endpoints );
     106void discover_bluetooth(
     107        EndpointSetPtr listenOn_endpoints,
     108        EndpointSetPtr discovered_endpoints )
     109{
     110#ifdef HAVE_LIBBLUETOOTH
     111    // FIXME aktuell bluetooth
     112//      hci_for_each_dev(HCI_UP, &AddressDiscovery::dev_info, (long)&endpoints );
     113#endif
     114}
     115
     116void discover_ip_addresses(
     117        EndpointSetPtr listenOn_endpoints,
     118        EndpointSetPtr discovered_endpoints )
     119{
     120    bool discover_ipv4 = false;
     121    bool discover_ipv6 = false;
     122    vector<uint16_t> ipv4_ports;
     123    vector<uint16_t> ipv6_ports;
     124   
     125    /* analyze listenOn_endpoints */
     126    BOOST_FOREACH( TcpIP_EndpointPtr endp, listenOn_endpoints->get_tcpip_endpoints() )
     127    {
     128        // BRANCH: IPv4 any [0.0.0.0]
     129        if ( endp->to_asio().address() == address_v4::any() )
     130        {
     131            // add port
     132            ipv4_ports.push_back(endp->to_asio().port());
     133           
     134            discover_ipv4 = true;
     135        }
     136
     137        // BRANCH: IPv6 any [::]
     138        else if ( endp->to_asio().address() == address_v6::any() )
     139        {
     140            // add port
     141            ipv6_ports.push_back(endp->to_asio().port());
     142           
     143            discover_ipv6 = true;
     144
     145           
     146            // NOTE: on linux the ipv6-any address [::] catches ipv4 as well
     147            ipv4_ports.push_back(endp->to_asio().port());
     148            discover_ipv4 = true;
     149        }
     150       
     151        // BRANCH: explicit ip address
     152        else
     153        {
     154            // ---> don't discover anything, just add it directly
     155            discovered_endpoints->add_endpoint(endp);
     156        }
     157    }
     158   
     159   
     160    /* discover addresses */
     161    if ( discover_ipv4 || discover_ipv6 )
     162    {
     163        struct ifaddrs* ifaceBuffer = NULL;
     164        void*           tmpAddrPtr  = NULL;
     165   
     166        int ret = getifaddrs( &ifaceBuffer );
     167        if( ret != 0 ) return;
     168   
     169        for( struct ifaddrs* i=ifaceBuffer; i != NULL; i=i->ifa_next )
     170        {
     171            // ignore devices that are disabled or have no ip
     172            if(i == NULL) continue;
     173            struct sockaddr* addr = i->ifa_addr;
     174            if (addr==NULL) continue;
     175   
     176    //          // ignore tun devices  // XXX why?
     177    //          string device = string(i->ifa_name);
     178    //          if(device.find_first_of("tun") == 0) continue;
     179
     180            // IPv4
     181            if ( discover_ipv4 && addr->sa_family == AF_INET )
     182            {
     183                char straddr[INET_ADDRSTRLEN];
     184                tmpAddrPtr= &((struct sockaddr_in*)addr)->sin_addr;
     185                inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
     186               
     187                address ip_addr = address::from_string(straddr);
     188   
     189                // skip loopback address
     190                if ( ip_addr.to_v4() == address_v4::loopback() )
     191                    continue;
     192
     193                // add endpoint for this address and every given ipv4 port
     194                BOOST_FOREACH( uint16_t port, ipv4_ports )
     195                {
     196                    tcp::endpoint tcpip_endp(ip_addr, port);
     197                    TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp));
     198                   
     199                    discovered_endpoints->add_endpoint(endp);
     200                }
     201            }
     202           
     203            // IPv6
     204            else if ( discover_ipv6 && addr->sa_family == AF_INET6 )
     205            {
     206                // look for ipv6
     207                char straddr[INET6_ADDRSTRLEN];
     208                tmpAddrPtr= &((struct sockaddr_in6*)addr)->sin6_addr;
     209                inet_ntop( i->ifa_addr->sa_family, tmpAddrPtr, straddr, sizeof(straddr) );
     210               
     211                address ip_addr = address::from_string(straddr);
     212               
     213                // skip loopback address
     214                if ( ip_addr.to_v6() == address_v6::loopback() )
     215                    continue;
     216
     217                // add endpoint for this address and every given ipv4 port
     218                BOOST_FOREACH( uint16_t port, ipv6_ports )
     219                {
     220                    tcp::endpoint tcpip_endp(ip_addr, port);
     221                    TcpIP_EndpointPtr endp(new tcpip_endpoint(tcpip_endp));
     222                   
     223                    discovered_endpoints->add_endpoint(endp);
     224                }
     225            }
     226        }
     227   
     228        freeifaddrs(ifaceBuffer);
     229    }
     230}
     231
     232
     233
     234EndpointSetPtr AddressDiscovery::discover_endpoints(EndpointSetPtr listenOn_endpoints)
     235{
     236    EndpointSetPtr discovered_endpoints(new addressing2::endpoint_set());
     237   
     238        discover_ip_addresses( listenOn_endpoints, discovered_endpoints );
     239        discover_bluetooth( listenOn_endpoints, discovered_endpoints );
     240       
     241        return discovered_endpoints;
    143242}
    144243
  • source/ariba/communication/networkinfo/AddressDiscovery.h

    r5639 r12060  
    4040#define __ADDRESS_DISCOVERY_H
    4141
    42 #include "ariba/utility/addressing/addressing.hpp"
    43 
    44 using namespace ariba::addressing;
     42#include "ariba/utility/addressing2/endpoint_set.hpp"
    4543
    4644namespace ariba {
    4745namespace communication {
    4846
     47using addressing2::EndpointSetPtr;
     48
    4949class AddressDiscovery {
    5050public:
    51         static void discover_endpoints( endpoint_set& endpoints );
     51        static EndpointSetPtr discover_endpoints(EndpointSetPtr listenOn_endpoints);
    5252
    5353private:
    54         static mac_address getMacFromIF( const char* name );
    55         static int dev_info(int s, int dev_id, long arg);
    56         static void discover_bluetooth( endpoint_set& endpoints );
    57         static void discover_ip_addresses( endpoint_set& endpoints );
     54        // TODO aktuell weg damit..
     55//      static mac_address getMacFromIF( const char* name );
     56//      static int dev_info(int s, int dev_id, long arg);
     57//      static void discover_bluetooth( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints );
     58//      static void discover_ip_addresses( EndpointSetPtr listenOn_endpoints, EndpointSetPtr discovered_endpoints );
    5859};
    5960
Note: See TracChangeset for help on using the changeset viewer.