Changeset 2473
- Timestamp:
- Feb 23, 2009, 2:21:49 PM (16 years ago)
- Files:
-
- 15 edited
Legend:
- Unmodified
- Added
- Removed
-
sample/pingpong/PingPong.cpp
r2432 r2473 1 2 1 #include "PingPong.h" 3 2 #include "ariba/utility/configuration/Configuration.h" … … 11 10 12 11 // logging 13 use_logging_cpp( PingPong );12 use_logging_cpp( PingPong ); 14 13 15 14 // the service id of the ping pong service 16 ServiceID PingPong::PINGPONG_ID = ServiceID( 111);15 ServiceID PingPong::PINGPONG_ID = ServiceID( 111 ); 17 16 18 17 // construction … … 27 26 void PingPong::startup() { 28 27 28 logging_info( "starting up PingPong service ... " ); 29 29 30 // create ariba module 30 logging_ info("Creating ariba underlay module ... ");31 logging_debug( "creating ariba underlay module ... " ); 31 32 ariba = new AribaModule(); 32 33 33 logging_info("Starting up PingPong service ... "); 34 35 // --- get config --- 34 // get the configuration object 36 35 Configuration& config = Configuration::instance(); 37 36 … … 66 65 node->start(); 67 66 68 if (!isInitiator) { 69 node->join(spovnetName); 70 } else { 71 node->initiate(spovnetName); 72 } 67 // initiate or join the spovnet 68 if (!isInitiator) node->join(spovnetName); 69 else node->initiate(spovnetName); 73 70 74 71 // bind communication and node listener 75 72 node->bind(this); 76 73 node->bind(this, PingPong::PINGPONG_ID); 77 74 78 75 // ping pong started up... 79 logging_info( "PingPong started up");76 logging_info( "pingpong started up "); 80 77 } 81 78 82 79 // implementation of the startup interface 83 80 void PingPong::shutdown() { 84 logging_info("PingPong service starting shutdown sequence ..."); 81 82 logging_info( "pingpong service starting shutdown sequence ..." ); 85 83 86 84 // stop timer 87 85 Timer::stop(); 88 89 // leave spovnet90 node->leave();91 86 92 87 // unbind listeners … … 94 89 node->unbind( this, PingPong::PINGPONG_ID ); 95 90 91 // leave spovnet 92 node->leave(); 93 94 // stop the ariba module 96 95 ariba->stop(); 96 97 // delete node and ariba module 97 98 delete node; 98 99 delete ariba; 99 100 100 logging_info("PingPong service shut down!"); 101 // now we are completely shut down 102 logging_info( "pingpong service shut down" ); 101 103 } 102 104 103 105 // node listener interface 104 106 void PingPong::onJoinCompleted( const SpoVNetID& vid ) { 105 logging_info("PingPong node join completed, spovnetid=" << vid.toString() ); 107 logging_info( "pingpong node join completed, spovnetid=" << vid.toString() ); 108 109 // start the timer to ping every second 110 Timer::setInterval( 1000 ); 111 Timer::start(); 106 112 } 107 113 108 114 void PingPong::onJoinFailed( const SpoVNetID& vid ) { 109 logging_ info("PingPong node join failed, spovnetid=" << vid.toString() );115 logging_error(" pingpong node join failed, spovnetid=" << vid.toString() ); 110 116 } 111 117 … … 115 121 } 116 122 117 void PingPong::onMessage(Message* msg, const NodeID& remote, const LinkID& lnk = 118 LinkID::UNSPECIFIED) { 119 PingPongMessage* incoming = msg->decapsulate<PingPongMessage> (); 123 void PingPong::onMessage(Message* msg, const NodeID& remote, 124 const LinkID& lnk = LinkID::UNSPECIFIED) { 120 125 121 logging_info("received ping message on link " << lnk.toString() 122 << " from node with id " << (int) incoming->getid()); 126 PingPongMessage* pingmsg = msg->decapsulate<PingPongMessage> (); 127 128 logging_info( "received ping message on link " << lnk.toString() 129 << " from node " << remote.toString() 130 << ": " << pingmsg->toString() ); 123 131 } 124 132 125 133 // timer event 126 134 void PingPong::eventFunction() { 135 136 // we ping all nodes that are known in the overlay structure 137 // this can be all nodes (OneHop) overlay or just some neighbors 138 // in case of a Chord or Kademlia structure 139 140 logging_info( "pinging overlay neighbors with ping id " << ++pingId ); 141 142 PingPongMessage pingmsg( pingId ); 143 node->sendBroadcastMessage( pingmsg, PingPong::PINGPONG_ID ); 144 127 145 } 128 146 -
sample/pingpong/PingPong.h
r2413 r2473 4 4 #include "ariba/ariba.h" 5 5 #include "PingPongMessage.h" 6 #include "ariba/utility/system/StartupInterface.h" 7 #include "ariba/utility/system/Timer.h" 6 8 7 9 using namespace ariba; 8 9 #include "ariba/utility/system/StartupInterface.h"10 #include "ariba/utility/system/Timer.h"11 10 using ariba::utility::StartupInterface; 12 11 using ariba::utility::Timer; … … 55 54 Node* node; 56 55 57 // flag, wheter this node i s the initiator of thisspovnet56 // flag, wheter this node initiates or just joins the spovnet 58 57 bool isInitiator; 59 58 … … 65 64 }; 66 65 67 //ARIBA_SIMULATION_SERVICE(PingPong); 66 // needed for simulation support 67 ARIBA_SIMULATION_SERVICE(PingPong); 68 68 69 69 }}} // namespace ariba, application, pingpong -
source/ariba/DataMessage.h
r2435 r2473 54 54 } 55 55 56 inline DataMessage( const Message& message ) { 57 this->data = (void*)const_cast<Message*>(&message); 58 this->size = ~0; 59 } 60 56 61 inline Message* getMessage() const { 57 62 return (Message*)data; -
source/ariba/Node.cpp
r2455 r2473 37 37 // [License] 38 38 39 40 39 #include "Node.h" 41 40 … … 61 60 62 61 } 62 63 63 ServiceInterfaceWrapper(CommunicationListener* listener) : 64 64 nodeListener(NULL), commListener(listener) { 65 65 } 66 67 ~ServiceInterfaceWrapper() { 68 } 66 69 67 70 protected: … … 69 72 70 73 } 74 71 75 void onOverlayDestroy(const SpoVNetID& id) { 72 76 … … 120 124 const NodeID& node) { 121 125 if (commListener != NULL) commListener->onMessage( 122 const_cast<Message*> 126 const_cast<Message*>(message), node, link); 123 127 } 124 128 }; 125 129 126 const ServiceID Node::anonymousService = 0xFF00;130 ServiceID Node::anonymousService = ServiceID(0xFF00); 127 131 128 132 Node::Node(AribaModule& ariba_mod, const Name& node_name) : … … 142 146 //TODO: Implement error handling: no bootstrap node available 143 147 void Node::initiate(const Name& vnetname, const SpoVNetProperties& parm) { 144 utility::OverlayParameterSet 145 ovrpset =146 (utility::OverlayParameterSet::_OverlayStructure) parm.getBaseOverlayType(); 148 utility::OverlayParameterSet ovrpset = 149 (utility::OverlayParameterSet::_OverlayStructure) parm.getBaseOverlayType(); 150 147 151 spovnetId = vnetname.toSpoVNetId(); 148 152 nodeId = generateNodeId(name); 153 149 154 this->context = ariba_mod.underlay_abs->createSpoVNet(spovnetId, nodeId, 150 ariba_mod.ip_addr, ariba_mod.tcp_port);155 ariba_mod.ip_addr, ariba_mod.tcp_port); 151 156 ariba_mod.addBootstrapNode(vnetname, 152 157 new EndpointDescriptor(this->context->getBaseCommunication().getEndpointDescriptor())); 153 158 } 154 159 155 160 void Node::leave() { 156 // not implemeted yet. 157 } 158 159 void Node::bind(NodeListener* listener) { 160 this->context->getOverlay().bind(new ServiceInterfaceWrapper(listener), 161 Node::anonymousService); 162 } 163 164 void Node::unbind(NodeListener* listener) { 165 // TODO: allow unbinding 161 ariba_mod.underlay_abs->leaveSpoVNet( context ); 162 context = NULL; 166 163 } 167 164 … … 171 168 172 169 const SpoVNetID& Node::getSpoVNetId() const { 173 return SpoVNetID::UNSPECIFIED;170 return spovnetId; 174 171 } 175 172 176 173 const NodeID& Node::getNodeId(const LinkID& lid) const { 177 return NodeID::UNSPECIFIED;174 return nodeId; 178 175 } 179 176 … … 201 198 } 202 199 200 void Node::sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid) { 201 return context->getOverlay().broadcastMessage((Message*)msg, sid); 202 } 203 204 void Node::bind(NodeListener* listener) { 205 context->getOverlay().bind(new ServiceInterfaceWrapper(listener), 206 Node::anonymousService); 207 } 208 209 void Node::unbind(NodeListener* listener) { 210 delete context->getOverlay().unbind(Node::anonymousService); 211 } 212 203 213 void Node::bind(CommunicationListener* listener, const ServiceID& sid) { 204 this->context->getOverlay().bind(new ServiceInterfaceWrapper(listener), sid);214 context->getOverlay().bind(new ServiceInterfaceWrapper(listener), sid); 205 215 } 206 216 207 217 void Node::unbind(CommunicationListener* listener, const ServiceID& sid) { 208 // TODO218 delete context->getOverlay().unbind(sid); 209 219 } 210 220 -
source/ariba/Node.h
r2460 r2473 37 37 // [License] 38 38 39 40 39 #ifndef NODE_H_ 41 40 #define NODE_H_ … … 236 235 seqnum_t sendMessage(const DataMessage& msg, const LinkID& lnk); 237 236 237 /** 238 * Sends a message to all known hosts in the overlay structure 239 * the nodes that are reached here depend on the overlay structure. 240 * 241 * @param msg The message to be send 242 * @param sid The id of the service that should receive the message 243 */ 244 void sendBroadcastMessage(const DataMessage& msg, const ServiceID& sid); 245 238 246 // --- communication listeners --- 239 247 … … 316 324 // delegates 317 325 interface::AribaContext* context; 318 static constServiceID anonymousService;326 static ServiceID anonymousService; 319 327 }; 320 328 -
source/ariba/interface/ServiceInterface.cpp
r2472 r2473 38 38 39 39 #include "ServiceInterface.h" 40 #include "ariba/interface/AribaContext.h"41 40 42 41 namespace ariba { 43 42 namespace interface { 44 43 45 ServiceInterface::ServiceInterface() : overlay( NULL ){44 ServiceInterface::ServiceInterface() { 46 45 } 47 46 48 47 ServiceInterface::~ServiceInterface(){ 49 if( overlay != NULL )50 overlay->unbind( this, serviceid );51 }52 53 bool ServiceInterface::initialize( AribaContext* _ctx, const ServiceID& _serviceid ){54 return initialize( &_ctx->getOverlay(), _serviceid );55 }56 57 bool ServiceInterface::initialize( BaseOverlay* _overlay, const ServiceID& _serviceid ){58 if( _overlay == NULL ) return false;59 60 overlay = _overlay;61 serviceid = _serviceid;62 63 return overlay->bind( this, serviceid);64 48 } 65 49 -
source/ariba/interface/ServiceInterface.h
r2472 r2473 67 67 namespace interface { 68 68 69 class AribaContext;70 71 69 class ServiceInterface : public OverlayEvents, MessageReceiver { 72 70 friend class ariba::overlay::BaseOverlay; … … 74 72 ServiceInterface(); 75 73 virtual ~ServiceInterface(); 76 77 bool initialize( AribaContext* _ctx, const ServiceID& _serviceid );78 bool initialize( BaseOverlay* _overlay, const ServiceID& _serviceid );79 74 80 75 protected: … … 96 91 97 92 virtual bool receiveMessage( const Message* message, const LinkID& link, const NodeID& node ); 98 99 private:100 BaseOverlay* overlay;101 ServiceID serviceid;102 93 }; 103 94 -
source/ariba/interface/UnderlayAbstraction.cpp
r2472 r2473 86 86 } 87 87 88 void UnderlayAbstraction::destroySpoVNet(AribaContext* ctx) {89 ctx->getOverlay().leaveSpoVNet();90 delete &ctx->getOverlay();91 delete &ctx->getBaseCommunication();92 delete ctx;93 }94 95 88 AribaContext* UnderlayAbstraction::joinSpoVNet(const SpoVNetID& spovnetid, const EndpointDescriptor& bootstrapnode, const NodeID& nodeid, const NetworkLocator* locallocator, const uint16_t localport) { 96 89 … … 107 100 108 101 void UnderlayAbstraction::leaveSpoVNet(AribaContext* ctx) { 109 destroySpoVNet( ctx ); 102 ctx->getOverlay().leaveSpoVNet(); 103 delete &ctx->getOverlay(); 104 delete &ctx->getBaseCommunication(); 105 delete ctx; 110 106 } 111 107 -
source/ariba/interface/UnderlayAbstraction.h
r2472 r2473 79 79 ); 80 80 81 void destroySpoVNet( AribaContext* ctx );82 83 81 AribaContext* joinSpoVNet( 84 82 const SpoVNetID& spovnetid, -
source/ariba/overlay/BaseOverlay.cpp
r2472 r2473 273 273 bool BaseOverlay::bind(ServiceInterface* service, const ServiceID& sid) { 274 274 275 logging_debug( "binding service on serviceid " << sid.toString() );275 logging_debug( "binding service " << service << " on serviceid " << sid.toString() ); 276 276 277 277 if( listenerMux.contains( sid ) ){ … … 284 284 } 285 285 286 bool BaseOverlay::unbind(ServiceInterface* service,const ServiceID& sid){287 288 logging_debug( "unbinding service onserviceid " << sid.toString() );286 ServiceInterface* BaseOverlay::unbind(const ServiceID& sid){ 287 288 logging_debug( "unbinding service from serviceid " << sid.toString() ); 289 289 290 290 if( !listenerMux.contains( sid ) ){ 291 291 logging_warn( "cannot unbind service. no service registered on service id " << sid.toString() ); 292 return false; 293 } 294 295 listenerMux.unregisterItem( service ); 296 return true; 292 return NULL; 293 } 294 295 ServiceInterface* iface = listenerMux.get( sid ); 296 listenerMux.unregisterItem( sid ); 297 298 return NULL; //iface; 297 299 } 298 300 -
source/ariba/overlay/BaseOverlay.h
r2472 r2473 185 185 * Unregister a receiver. 186 186 * 187 * @param receiver An implementation of the receiver interface188 */ 189 bool unbind( ServiceInterface* service,const ServiceID& sid );187 * @param sid The service id to unregister 188 */ 189 ServiceInterface* unbind( const ServiceID& sid ); 190 190 191 191 /** … … 271 271 * to deliver upcoming messages to the correct service. 272 272 */ 273 Demultiplexer<ServiceInterface*, constServiceID> listenerMux;273 Demultiplexer<ServiceInterface*, ServiceID> listenerMux; 274 274 275 275 /** -
source/ariba/overlay/modules/OverlayInterface.cpp
r2472 r2473 53 53 eventsReceiver( _eventsReceiver ) { 54 54 55 ServiceInterface::initialize( &_baseoverlay, OVERLAY_SERVICE_ID );55 _baseoverlay.bind( this, OVERLAY_SERVICE_ID ); 56 56 } 57 57 58 58 OverlayInterface::~OverlayInterface(){ 59 baseoverlay.unbind( OVERLAY_SERVICE_ID ); 59 60 } 60 61 -
source/ariba/utility/misc/Demultiplexer.hpp
r2472 r2473 41 41 42 42 #include <list> 43 #include <iostream> 43 44 #include <map> 44 45 #include <boost/thread/mutex.hpp> … … 46 47 #include "ariba/utility/messages/Message.h" 47 48 49 using std::cout; 48 50 using std::list; 49 51 using std::map; … … 73 75 boost::mutex mapMutex; 74 76 77 void debugprint() { 78 cout << "-------------start--------" << std::endl; 79 { 80 LISTENER_SERVICE_MAP_CITERATOR i = mapListenerService.begin(); 81 LISTENER_SERVICE_MAP_CITERATOR iend = mapListenerService.end(); 82 83 for( ; i != iend; i++ ) 84 cout << "xxx" << i->first.toString() << " -> " << i->second << std::endl; 85 } 86 cout << "-----------------------" << std::endl; 87 { 88 SERVICE_LISTENER_MAP_CITERATOR i = mapServiceListener.begin(); 89 SERVICE_LISTENER_MAP_CITERATOR iend = mapServiceListener.end(); 90 91 for( ; i != iend; i++ ) 92 cout << "xxx" << i->first << " -> " << i->second.toString() << std::endl; 93 } 94 cout << "-------------end---------" << std::endl; 95 } 96 75 97 public: 76 98 … … 83 105 void registerItem( S id, T listener ) { 84 106 boost::mutex::scoped_lock lock( mapMutex ); 85 { 86 mapServiceListener.insert( SERVICE_LISTENER_PAIR( id, listener ) ); 87 mapListenerService.insert( LISTENER_SERVICE_PAIR( listener, id ) ); 88 } 107 108 mapServiceListener.insert( SERVICE_LISTENER_PAIR( id, listener ) ); 109 mapListenerService.insert( LISTENER_SERVICE_PAIR( listener, id ) ); 89 110 } 90 111 91 void unregisterItem( S id ) {112 void unregisterItem( S id ) { 92 113 T listener = get( id ); 93 94 boost::mutex::scoped_lock lock( mapMutex ); 114 95 115 { 116 boost::mutex::scoped_lock lock( mapMutex ); 96 117 mapServiceListener.erase( id ); 97 118 mapListenerService.erase( listener ); … … 100 121 101 122 void unregisterItem( T listener ) { 102 S id = get (listener);123 S id = get( listener ); 103 124 unregisterItem( id ); 104 125 } … … 106 127 S get( T listener ) { 107 128 boost::mutex::scoped_lock lock( mapMutex ); 108 { 109 LISTENER_SERVICE_MAP_CITERATOR it = mapListenerService.find( listener ); 110 return it->second; 111 } 129 130 LISTENER_SERVICE_MAP_CITERATOR it = mapListenerService.find( listener ); 131 return it->second; 112 132 } 113 133 114 134 T get( S id ) { 115 135 boost::mutex::scoped_lock lock( mapMutex ); 116 {117 SERVICE_LISTENER_MAP_CITERATOR it = mapServiceListener.find( id );118 136 119 if( it == mapServiceListener.end() ) return NULL;120 else return it->second;121 }137 SERVICE_LISTENER_MAP_CITERATOR it = mapServiceListener.find( id ); 138 if( it == mapServiceListener.end() ) return NULL; 139 else return it->second; 122 140 } 123 141 124 142 bool contains( T listener ) { 125 143 boost::mutex::scoped_lock lock( mapMutex ); 126 { 127 LISTENER_SERVICE_MAP_CITERATOR it = mapListenerService.find( listener ); 128 return ( it != mapListenerService.end() ); 129 } 144 145 LISTENER_SERVICE_MAP_CITERATOR it = mapListenerService.find( listener ); 146 return ( it != mapListenerService.end() ); 130 147 } 131 148 132 149 bool contains( S id ) { 133 150 boost::mutex::scoped_lock lock( mapMutex ); 134 { 135 SERVICE_LISTENER_MAP_CITERATOR it = mapServiceListener.find( id ); 136 return ( it != mapServiceListener.end() ); 137 } 151 152 SERVICE_LISTENER_MAP_CITERATOR it = mapServiceListener.find( id ); 153 return ( it != mapServiceListener.end() ); 138 154 } 139 155 … … 141 157 typedef list<T> TwoList; 142 158 143 OneList getOneList() const { 159 OneList getOneList() { 160 boost::mutex::scoped_lock lock( mapMutex ); 144 161 OneList ret; 145 162 … … 151 168 } 152 169 153 TwoList getTwoList() const { 170 TwoList getTwoList() { 171 boost::mutex::scoped_lock lock( mapMutex ); 154 172 TwoList ret; 155 173 -
source/ariba/utility/types/ServiceID.cpp
r2472 r2473 24 24 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ARIBA PROJECT OR 25 25 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 26 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,26 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 27 27 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 28 28 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF … … 50 50 } 51 51 52 ServiceID::ServiceID(u nsigned int _id) : id( _id ){52 ServiceID::ServiceID(uint32_t _id) : id( _id ){ 53 53 } 54 54 … … 57 57 58 58 ServiceID::~ServiceID() { 59 } 60 61 ServiceID& ServiceID::operator=(const ServiceID &rh) { 62 id = rh.id; 63 return *this; 59 64 } 60 65 -
source/ariba/utility/types/ServiceID.h
r2472 r2473 72 72 bool operator<(const ServiceID& rh) const; 73 73 bool operator!=(const ServiceID& rh) const; 74 ServiceID& operator=(const ServiceID &rh); 74 75 75 76 inline bool isUnspecified() const {
Note:
See TracChangeset
for help on using the changeset viewer.