Changeset 5151
- Timestamp:
- Jul 21, 2009, 1:54:55 PM (15 years ago)
- Files:
-
- 4 added
- 28 edited
Legend:
- Unmodified
- Added
- Removed
-
/
- Property svn:mergeinfo changed (with no actual effect on merging)
-
sample/pingpong/PingPong.cpp
r4948 r5151 17 17 // construction 18 18 PingPong::PingPong() : pingId( 0 ) { 19 Timer::setInterval( 5000 );19 Timer::setInterval( 1000 ); 20 20 } 21 21 … … 41 41 // get initiator flag 42 42 this->isInitiator = Configuration::instance().read<bool>("node.initiator"); 43 44 this->name = string("<ping>"); 43 45 44 46 // get node name … … 55 57 if (config.exists("ariba.bootstrap.hints")) ariba->setProperty("bootstrap.hints", 56 58 config.read<string>("ariba.bootstrap.hints")); 59 if (config.exists("pingpong.name")) name = 60 config.read<string>("pingpong.name"); 57 61 58 62 // start ariba module … … 71 75 // when initiating, you can define the overlay type, default is Chord [CHORD_OVERLAY] 72 76 SpoVNetProperties params; 73 params.setBaseOverlayType( SpoVNetProperties::ONE_HOP_OVERLAY ); // alternative: OneHop77 //params.setBaseOverlayType( SpoVNetProperties::ONE_HOP_OVERLAY ); // alternative: OneHop 74 78 75 79 // initiate or join the spovnet … … 122 126 // function that is implemented further down in PingPong::onLinkUp 123 127 124 logging_info( "pinging overlay neighbors with ping id " << ++pingId );125 126 PingPongMessage pingmsg( pingId );128 // logging_info( "pinging overlay neighbors with ping id " << ++pingId ); 129 130 PingPongMessage pingmsg( pingId, name ); 127 131 128 132 //----------------------------------------------------------------------- 129 133 // Option 1: get all neighboring nodes and send the message to each 130 134 //----------------------------------------------------------------------- 135 counter++; 136 if (counter<0 || counter>4) { 137 counter = 0; 138 string s; 139 for (int i=0; i<names.size();i++) { 140 if (i!=0) s+= ", "; 141 s = s+names[i]; 142 } 143 logging_info("----> I am " << name << " and I know " << s); 144 names.clear(); 145 } 131 146 vector<NodeID> nodes = node->getNeighborNodes(); 132 147 BOOST_FOREACH( NodeID nid, nodes ){ … … 162 177 void PingPong::onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& lnk) { 163 178 PingPongMessage* pingmsg = msg.getMessage()->convert<PingPongMessage> (); 164 165 logging_info( "received ping message on link " << lnk.toString() 166 << " from node " << remote.toString() 167 << ": " << pingmsg->info() ); 179 bool found=false; 180 for (int i=0;i<names.size(); i++) if (names[i]==pingmsg->getName()) found=true; 181 if (!found) names.push_back(pingmsg->getName()); 182 // logging_info( "received ping message on link " << lnk.toString() 183 // << " from node " << remote.toString() 184 // << ": " << pingmsg->info() ); 168 185 } 169 186 -
sample/pingpong/PingPong.h
r3071 r5151 6 6 #include "ariba/utility/system/StartupInterface.h" 7 7 #include "ariba/utility/system/Timer.h" 8 9 #include <vector> 8 10 9 11 using namespace ariba; … … 15 17 namespace pingpong { 16 18 19 using namespace std; 20 17 21 /** 18 /* The PingPong main class 19 /* This class implements an example service for demonstration purposes 20 /* The pingpong class sends and receives messages between two SpoVNet 21 /* instances 22 **/ 23 class PingPong : 24 public NodeListener, 22 /* The PingPong main class 23 /* This class implements an example service for demonstration purposes 24 /* The pingpong class sends and receives messages between two SpoVNet 25 /* instances 26 **/ 27 class PingPong: public NodeListener, 25 28 public CommunicationListener, 26 29 public StartupInterface, … … 35 38 protected: 36 39 // communication listener interface 37 virtual bool onLinkRequest(const NodeID& remote, const DataMessage& msg = DataMessage::UNSPECIFIED); 38 virtual void onMessage(const DataMessage& msg, const NodeID& remote, const LinkID& lnk= LinkID::UNSPECIFIED); 40 virtual bool onLinkRequest(const NodeID& remote, const DataMessage& msg = 41 DataMessage::UNSPECIFIED); 42 virtual void onMessage(const DataMessage& msg, const NodeID& remote, 43 const LinkID& lnk = LinkID::UNSPECIFIED); 39 44 virtual void onLinkUp(const LinkID& lnk, const NodeID& remote); 40 45 virtual void onLinkDown(const LinkID& lnk, const NodeID& remote); … … 43 48 44 49 // node listener interface 45 virtual void onJoinCompleted( const SpoVNetID& vid);46 virtual void onJoinFailed( const SpoVNetID& vid);47 virtual void onLeaveCompleted( const SpoVNetID& vid);48 virtual void onLeaveFailed( const SpoVNetID& vid);50 virtual void onJoinCompleted(const SpoVNetID& vid); 51 virtual void onJoinFailed(const SpoVNetID& vid); 52 virtual void onLeaveCompleted(const SpoVNetID& vid); 53 virtual void onLeaveFailed(const SpoVNetID& vid); 49 54 50 55 // startup wrapper interface … … 53 58 54 59 // timer events 55 60 virtual void eventFunction(); 56 61 57 62 private: 58 63 // the ariba module and a node 59 64 AribaModule* ariba; 60 Node* node; 65 Node* node; 66 string name; 67 int counter; 68 vector<string> names; 61 69 62 70 // flag, whether this node initiates or just joins the spovnet 63 71 bool isInitiator; 64 72 65 66 73 // the ping pong service id 74 static ServiceID PINGPONG_SERVICEID; 67 75 68 69 76 // the current ping id 77 unsigned long pingId; 70 78 71 79 }; -
sample/pingpong/PingPongMessage.cpp
r4626 r5151 10 10 } 11 11 12 PingPongMessage::PingPongMessage(uint8_t _id ) : id(_id) {12 PingPongMessage::PingPongMessage(uint8_t _id, string name) : id(_id),name(name) { 13 13 } 14 14 -
sample/pingpong/PingPongMessage.h
r4625 r5151 18 18 public: 19 19 PingPongMessage(); 20 PingPongMessage( uint8_t _id );20 PingPongMessage( uint8_t _id, string name = string("<ping>") ); 21 21 virtual ~PingPongMessage(); 22 22 23 23 string info(); 24 24 uint8_t getid(); 25 26 inline string getName() const { 27 return name; 28 } 25 29 private: 26 30 uint8_t id; 31 string name; 27 32 }; 28 33 … … 30 35 31 36 sznBeginDefault( ariba::application::pingpong::PingPongMessage, X ) { 32 X && id ;37 X && id && T(name); 33 38 } sznEnd(); 34 39 -
sample/pingpong/main.cpp
r4925 r5151 7 7 using ariba::application::pingpong::PingPong; 8 8 9 //*************************************************10 /*11 #include "ariba/utility/bootstrap/BootstrapManager.h"12 using ariba::utility::BootstrapManager;13 14 void debug(){15 StartupWrapper::startSystem();16 17 BootstrapManager& manager = BootstrapManager::instance();18 manager.registerModule( BootstrapManager::BootstrapTypePeriodicBroadcast );19 20 manager.publish("testname", "testinfo1", "testinfo2", "testinfo3");21 getchar();22 manager.revoke("testname");23 24 manager.unregisterModule( BootstrapManager::BootstrapTypePeriodicBroadcast );25 StartupWrapper::stopSystem();26 }27 */28 //*************************************************29 30 9 int main( int argc, char** argv ) { 31 32 //debug();33 //return 0;34 10 35 11 // get config file -
source/ariba/Makefile.am
r4851 r5151 259 259 libariba_la_SOURCES += \ 260 260 overlay/BaseOverlay.cpp \ 261 overlay/OverlayBootstrap.cpp 261 overlay/OverlayBootstrap.cpp \ 262 overlay/LinkDescriptor.cpp 262 263 263 264 nobase_libariba_la_HEADERS += \ 264 265 overlay/BaseOverlay.h \ 265 overlay/OverlayBootstrap.h 266 overlay/OverlayBootstrap.h \ 267 overlay/LinkDescriptor.h 266 268 267 269 #------------> overlay :: messages … … 271 273 overlay/messages/JoinRequest.cpp \ 272 274 overlay/messages/LinkRequest.cpp \ 275 overlay/messages/RelayMessage.cpp \ 273 276 overlay/messages/OverlayMsg.cpp 274 277 … … 277 280 overlay/messages/JoinRequest.h \ 278 281 overlay/messages/LinkRequest.h \ 282 overlay/messages/RelayMessage.h \ 279 283 overlay/messages/OverlayMsg.h 280 284 -
source/ariba/SideportListener.cpp
r4751 r5151 40 40 41 41 #include "ariba/overlay/BaseOverlay.h" 42 #include "ariba/overlay/LinkDescriptor.h" 43 44 using ariba::overlay::LinkDescriptor; 42 45 43 46 namespace ariba { … … 71 74 } 72 75 73 //****************************************************** 74 // 75 // JUST EXPERIMENTAL WITH RANDOM RESULTS 76 // 76 bool SideportListener::isRelayedNode(const NodeID& node){ 77 77 78 // 0 = normal node --> directly accessible 79 // 1 = relaying node --> node that is relaying for us (must also be 0) 80 // 2 = relayed node --> node that we cannot access directly 81 static map<NodeID,short> relayingnodes; 82 static map<NodeID,SideportListener::Protocol> protocolnodes; 78 BOOST_FOREACH( LinkDescriptor* link, overlay->links ){ 79 80 if( (!link->localRelay.isUnspecified()) && link->remoteRelay == node && link->up) { 81 return true; 82 } 83 84 } 85 86 return false; 87 } 83 88 84 89 bool SideportListener::isRelayingNode(const NodeID& node){ 85 90 86 map<NodeID,short>::iterator i = relayingnodes.find(node); 87 if(i != relayingnodes.end()) return (i->second == 1); 91 BOOST_FOREACH( LinkDescriptor* link, overlay->links ){ 88 92 89 relayingnodes.insert( std::make_pair( node, rand()%3 ) ); 90 } 93 if( (!link->localRelay.isUnspecified()) && link->localRelay == node && link->up) { 94 return true; 95 } 91 96 92 bool SideportListener::isRelayedNode(const NodeID& node){93 94 map<NodeID,short>::iterator i = relayingnodes.find(node);95 if(i != relayingnodes.end()) return (i->second == 2);96 97 relayingnodes.insert( std::make_pair( node, rand()%3 ) );98 }99 100 SideportListener::Protocol generateRandomProtocol(){101 102 if( (rand() % 3) == 0 ){103 return SideportListener::rfcomm;104 97 } 105 98 106 int ret = SideportListener::undefined; 107 108 if( (rand() % 2) == 0 ) ret |= SideportListener::ipv4; 109 else ret |= SideportListener::ipv6; 110 111 if( (rand() % 2) == 0 ) ret |= SideportListener::udp; 112 else ret |= SideportListener::tcp; 113 114 return (SideportListener::Protocol)ret; 99 return false; 115 100 } 116 101 117 102 SideportListener::Protocol SideportListener::getReachabilityProtocol(const NodeID& node){ 118 119 map<NodeID,Protocol>::iterator i = protocolnodes.find(node); 120 if(i != protocolnodes.end()) return i->second; 121 122 protocolnodes.insert( std::make_pair(node, generateRandomProtocol()) ); 103 return (SideportListener::Protocol)(ipv4 | tcp); 123 104 } 124 125 //126 //127 //******************************************************128 105 129 106 void SideportListener::configure( overlay::BaseOverlay* _overlay ) { -
source/ariba/SideportListener.h
r4751 r5151 42 42 #include <vector> 43 43 #include <map> 44 #include <boost/foreach.hpp> 44 45 #include "Identifiers.h" 45 46 #include "CommunicationListener.h" -
source/ariba/communication/BaseCommunication.cpp
r4984 r5151 97 97 IPv4Locator* ipv4locator = dynamic_cast<IPv4Locator*>(*i); 98 98 99 // TODO: which locators can we find to bind to?99 // TODO: which locators are find to bind to? 100 100 // localhost is not too bad, works when testing locally 101 101 // with several instances. the manual override currently -
source/ariba/communication/modules/transport/protlib/connectionmap.cpp
r5150 r5151 101 101 AssocData* ConnectionMap::lookup(const appladdress& addr) const 102 102 { 103 const_addr2data_it_t hit= addr2data.find(addr); 104 if (hit!=addr2data.end()) return hit->second; 105 else return NULL; 103 // hack: only search for ip addresses! 104 const char* addr_ip = addr.get_ip_str(); 105 for (const_addr2data_it_t::const_iterator i=addr2data.begin(); i!=addr2data.end(); i++) { 106 const appladdress& map_addr = i->first; 107 const char* map_ip = map_addr.get_ip_str(); 108 if (strcmp(map_ip, addr_ip)==0) return i->second; 109 } 110 return NULL; 111 112 // const_addr2data_it_t hit= addr2data.find(addr); 113 // if (hit!=addr2data.end()) return hit->second; 114 // else return NULL; 106 115 } // end lookup 107 116 -
source/ariba/communication/modules/transport/tcp/TCPTransport.cpp
r4618 r5151 242 242 //std::cout << "XXXXXXXXXXXXXsending out data using tcp transport: " << data << std::endl; 243 243 244 const_cast<Message*>(message)->dropPayload();245 244 246 245 // prepare netmsg and send it … … 258 257 " to address " + address->toString() << 259 258 ": " + message->toString() ); 259 const_cast<Message*>(message)->dropPayload(); 260 260 261 261 string s = address->toString(); -
source/ariba/overlay/BaseOverlay.cpp
r4839 r5151 39 39 #include "BaseOverlay.h" 40 40 41 #include "ariba/utility/misc/OvlVis.h" 41 #include <sstream> 42 #include <iostream> 43 #include <string> 44 #include <boost/foreach.hpp> 45 42 46 #include "ariba/NodeListener.h" 43 47 #include "ariba/CommunicationListener.h" 44 48 #include "ariba/SideportListener.h" 45 49 50 #include "ariba/overlay/LinkDescriptor.h" 46 51 #include "ariba/overlay/messages/OverlayMsg.h" 47 52 #include "ariba/overlay/messages/JoinRequest.h" 48 53 #include "ariba/overlay/messages/JoinReply.h" 49 54 #include "ariba/overlay/messages/LinkRequest.h" 55 #include "ariba/overlay/messages/RelayMessage.h" 56 57 #include "ariba/utility/misc/OvlVis.h" 50 58 51 59 namespace ariba { 52 60 namespace overlay { 53 61 62 #define logging_force(x) std::cout << x << std::endl; 63 #define logging_force1(x) std::cout << x << std::endl; 64 65 LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) { 66 BOOST_FOREACH( LinkDescriptor* lp, links ) 67 if ((communication ? lp->communicationId : lp->overlayId) == link) 68 return lp; 69 return NULL; 70 } 71 72 const LinkDescriptor* BaseOverlay::getDescriptor( const LinkID& link, bool communication ) const { 73 BOOST_FOREACH( const LinkDescriptor* lp, links ) 74 if ((communication ? lp->communicationId : lp->overlayId) == link) 75 return lp; 76 return NULL; 77 } 78 79 LinkDescriptor* BaseOverlay::getAutoDescriptor( const NodeID& node, const ServiceID& service ) { 80 BOOST_FOREACH( LinkDescriptor* lp, links ) 81 if (lp->autolink && lp->remoteNode == node && lp->service == service) 82 return lp; 83 return NULL; 84 } 85 86 void BaseOverlay::eraseDescriptor( const LinkID& link, bool communication ) { 87 for ( vector<LinkDescriptor*>::iterator i = links.begin(); i!= links.end(); i++) { 88 LinkDescriptor* ld = *i; 89 if ((communication ? ld->communicationId : ld->overlayId) == link) { 90 delete ld; 91 links.erase(i); 92 break; 93 } 94 } 95 } 96 97 LinkDescriptor* BaseOverlay::addDescriptor( const LinkID& link ) { 98 LinkDescriptor* desc = getDescriptor( link ); 99 if ( desc == NULL ) { 100 desc = new LinkDescriptor(); 101 desc->overlayId = link; 102 links.push_back(desc); 103 } 104 return desc; 105 } 106 107 /// returns a direct link relay descriptor to the given relay node 108 LinkDescriptor* BaseOverlay::getRelayDescriptor( const NodeID& relayNode ) { 109 BOOST_FOREACH( LinkDescriptor* lp, links ) 110 if (lp->remoteNode == relayNode && 111 lp->service == OverlayInterface::OVERLAY_SERVICE_ID && 112 lp->relay == false && 113 lp->up) 114 return lp; 115 return NULL; 116 } 117 118 /// find a proper relay node 119 const NodeID BaseOverlay::findRelayNode( const NodeID& id ) { 120 LinkDescriptor* rld = NULL; 121 NodeID relayNode = NodeID::UNSPECIFIED; 122 123 // get used next hop towards node 124 LinkID rlid = overlayInterface->getNextLinkId(id); 125 while ( relayNode.isUnspecified() && !rlid.isUnspecified() && rld == NULL ) { 126 127 // get descriptor of first hop 128 rld = getDescriptor(rlid); 129 logging_force1( rld ); 130 131 // is first hop a relay path? yes-> try to find real link! 132 if ( rld->relay ) 133 relayNode = getRelayDescriptor(rld->localRelay)->remoteNode; 134 135 // no-> a proper relay node has been found 136 else relayNode = rld->remoteNode; 137 } 138 logging_force1( "Potential relay node " << relayNode.toString() ); 139 // do not return myself or use the node as relay node 140 if (relayNode == nodeId) 141 return NodeID::UNSPECIFIED; 142 else { 143 logging_force1( "Returning relay node " << relayNode.toString() ); 144 return relayNode; 145 } 146 } 147 148 /// forwards a message over relays/overlay/directly using link descriptor 149 seqnum_t BaseOverlay::sendMessage( Message* message, const LinkDescriptor* ld ) { 150 151 // directly send message 152 if ( !ld->communicationId.isUnspecified() && ld->communicationUp ) { 153 logging_debug("sendMessage: Sending message via Base Communication"); 154 return bc->sendMessage( ld->communicationId, message ); 155 } 156 157 // relay message 158 else if ( ld->relay ) { 159 logging_debug("sendMessage: Relaying message to node " 160 << ld->remoteNode.toString() 161 << " using relay " << ld->localRelay 162 ); 163 164 // get local relay link descriptor and mark as used for relaying 165 LinkDescriptor* rld = getRelayDescriptor(ld->localRelay); 166 if (rld==NULL) { 167 logging_error("sendMessage: Relay descriptor for relay " << 168 ld->localRelay.toString() << " unknown."); 169 return -1; 170 } 171 rld->markAsRelay(); 172 173 // create a information relay message to inform the relay about 174 OverlayMsg overlay_msg( OverlayMsg::typeRelay, ld->service, nodeId ); 175 RelayMessage relayMsg( RelayMessage::typeInform, ld->remoteRelay, ld->remoteNode, ld->remoteLinkId ); 176 relayMsg.encapsulate( message ); 177 overlay_msg.encapsulate( &relayMsg ); 178 179 // route message to relay node in order to inform it! 180 logging_debug("sendMessage: Sending message over relayed link with" << ld ); 181 overlayInterface->routeMessage(rld->remoteNode, rld->overlayId, &overlay_msg); 182 return 0; 183 } 184 185 // route message using overlay 186 else { 187 logging_error("Could not send message"); 188 logging_debug( "sendMessage: Routing message to node " << ld->remoteNode.toString() ); 189 overlayInterface->routeMessage( ld->remoteNode, message ); 190 return 0; 191 } 192 193 return -1; 194 } 195 196 /// creates a link descriptor, apply relay semantics if possible 197 LinkDescriptor* BaseOverlay::createLinkDescriptor( 198 const NodeID& remoteNode, const ServiceID& service, const LinkID& link_id ) { 199 200 // find listener 201 if( !communicationListeners.contains( service ) ) { 202 logging_error( "No listener found for service " << service.toString() ); 203 return NULL; 204 } 205 CommunicationListener* listener = communicationListeners.get( service ); 206 assert( listener != NULL ); 207 208 // copy link id 209 LinkID linkid = link_id; 210 211 // create link id if necessary 212 if ( linkid.isUnspecified() ) 213 linkid = LinkID::create(); 214 215 // create relay link descriptor 216 NodeID relayNode = findRelayNode(remoteNode); 217 218 // add descriptor 219 LinkDescriptor* ld = addDescriptor( linkid ); 220 ld->overlayId = linkid; 221 ld->service = service; 222 ld->listener = listener; 223 ld->remoteNode = remoteNode; 224 225 // set relay node if available 226 ld->relay = !relayNode.isUnspecified(); 227 ld->localRelay = relayNode; 228 229 if (!ld->relay) 230 logging_error("No relay found!"); 231 232 // debug output 233 logging_debug( "Created link descriptor: " << ld ); 234 235 return ld; 236 } 237 238 239 // ---------------------------------------------------------------------------- 240 54 241 use_logging_cpp(BaseOverlay); 55 242 56 BaseOverlay::BaseOverlay() 57 : bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED), 58 spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED), 59 state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT){ 60 } 61 62 BaseOverlay::~BaseOverlay(){ 63 } 64 65 void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ){ 66 243 // ---------------------------------------------------------------------------- 244 245 BaseOverlay::BaseOverlay() : 246 bc(NULL), overlayInterface(NULL), nodeId(NodeID::UNSPECIFIED), 247 spovnetId(SpoVNetID::UNSPECIFIED), initiatorLink(LinkID::UNSPECIFIED), 248 state(BaseOverlayStateInvalid), sideport(&SideportListener::DEFAULT) { 249 } 250 251 BaseOverlay::~BaseOverlay() { 252 } 253 254 // ---------------------------------------------------------------------------- 255 256 void BaseOverlay::start( BaseCommunication& _basecomm, const NodeID& _nodeid ) { 257 logging_info("Starting..."); 258 259 // set parameters 67 260 bc = &_basecomm; 68 261 nodeId = _nodeid; 69 262 70 logging_info("creating base overlay"); 71 263 // register at base communication 72 264 bc->registerMessageReceiver( this ); 73 265 bc->registerEventListener( this ); 74 266 75 ovl.visCreate( ovlId, nodeId, string(""), string("") );76 ovl.visChangeNodeColor(ovlId, nodeId, OvlVis::NODE_COLORS_GREY);77 78 // if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==79 // Identifier(Configuration::instance().read<unsigned long>("SOURCE"))) {80 // ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CAMERA);81 // } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==82 // Identifier(Configuration::instance().read<unsigned long>("MR_A"))) {83 // ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_A);84 // } else if (Identifier(Configuration::instance().read<unsigned long>("BASE_nodeid")) ==85 // Identifier(Configuration::instance().read<unsigned long>("MR_W"))) {86 // ovl.visChangeNodeIcon(ovlId, nodeId, OvlVis::ICON_ID_CHARACTER_W);87 // }88 89 267 // timer for auto link management 90 Timer::setInterval( 500 0);268 Timer::setInterval( 500 ); 91 269 Timer::start(); 92 270 } 93 271 94 272 void BaseOverlay::stop() { 95 96 logging_info("deleting base overlay"); 97 273 logging_info("Stopping..."); 274 275 // stop timer 98 276 Timer::stop(); 277 278 // delete oberlay interface 279 if(overlayInterface != NULL) { 280 delete overlayInterface; 281 overlayInterface = NULL; 282 } 283 284 // unregister at base communication 99 285 bc->unregisterMessageReceiver( this ); 100 286 bc->unregisterEventListener( this ); 101 102 if(overlayInterface != NULL){ 103 delete overlayInterface; 104 overlayInterface = NULL; 105 } 106 } 107 108 void BaseOverlay::createSpoVNet(const SpoVNetID& id, const OverlayParameterSet& param, const SecurityParameterSet& sec, const QoSParameterSet& qos){ 287 } 288 289 // ---------------------------------------------------------------------------- 290 291 void BaseOverlay::joinSpoVNet(const SpoVNetID& id, 292 const EndpointDescriptor& bootstrapEp) { 293 294 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." ); 295 logging_info( "Starting to join spovnet " << id.toString() << 296 " with nodeid " << nodeId.toString()); 297 298 // contact the spovnet initiator and request to join. if the join is granted we will 299 // receive further information on the structure of the overlay that is used in the spovnet 300 // but first, we have to establish a link to the initiator... 301 spovnetId = id; 302 state = BaseOverlayStateJoinInitiated; 303 304 initiatorLink = bc->establishLink( bootstrapEp ); 305 logging_info("join process initiated for " << id.toString() << "..."); 306 } 307 308 void BaseOverlay::leaveSpoVNet() { 309 310 logging_info( "Leaving spovnet " << spovnetId ); 311 bool ret = ( state != this->BaseOverlayStateInvalid ); 312 313 logging_debug( "Dropping all auto-links" ); 314 315 // gather all service links 316 vector<LinkID> servicelinks; 317 BOOST_FOREACH( LinkDescriptor* ld, links ) { 318 if( ld->service != OverlayInterface::OVERLAY_SERVICE_ID ) 319 servicelinks.push_back( ld->overlayId ); 320 } 321 322 // drop all service links 323 BOOST_FOREACH( LinkID lnk, servicelinks ) 324 dropLink( lnk ); 325 326 // let the node leave the spovnet overlay interface 327 logging_debug( "Leaving overlay" ); 328 if( overlayInterface != NULL ) 329 overlayInterface->leaveOverlay(); 330 331 // leave spovnet 332 if( state != BaseOverlayStateInitiator ) { 333 // then, leave the spovnet baseoverlay 334 OverlayMsg overMsg( OverlayMsg::typeBye, nodeId ); 335 bc->sendMessage( initiatorLink, &overMsg ); 336 337 // drop the link and set to correct state 338 bc->dropLink( initiatorLink ); 339 initiatorLink = LinkID::UNSPECIFIED; 340 } 341 342 // change to inalid state 343 state = BaseOverlayStateInvalid; 344 ovl.visShutdown( ovlId, nodeId, string("") ); 345 346 // inform all registered services of the event 347 BOOST_FOREACH( NodeListener* i, nodeListeners ) { 348 if( ret ) i->onLeaveCompleted( spovnetId ); 349 else i->onLeaveFailed( spovnetId ); 350 } 351 } 352 353 void BaseOverlay::createSpoVNet(const SpoVNetID& id, 354 const OverlayParameterSet& param, 355 const SecurityParameterSet& sec, 356 const QoSParameterSet& qos) { 109 357 110 358 // set the state that we are an initiator, this way incoming messages are … … 117 365 118 366 overlayInterface = OverlayFactory::create( *this, param, nodeId, this ); 119 if( overlayInterface == NULL ) {367 if( overlayInterface == NULL ) { 120 368 logging_fatal( "overlay structure not supported" ); 121 369 state = BaseOverlayStateInvalid; … … 129 377 130 378 ovl.visChangeNodeIcon ( ovlId, nodeId, OvlVis::ICON_ID_CAMERA ); 131 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); 132 } 133 134 void BaseOverlay::joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& bootstrapEp){ 135 136 ovl.visShowNodeBubble ( ovlId, nodeId, "joining..." ); 137 logging_info( "starting to join spovnet " << id.toString() << 138 " with nodeid " << nodeId.toString()); 139 140 spovnetId = id; 141 state = BaseOverlayStateJoinInitiated; 142 143 // 144 // start bootstrapping for spovnets and publish our information 145 // 146 147 overlayBootstrap.start( this, spovnetId, nodeId ); 148 overlayBootstrap.publish( bc->getEndpointDescriptor() ); 149 150 // 151 // contact the spovnet initiator and request 152 // to join. if the join is granted we will 153 // receive further information on the structure 154 // of the overlay that is used in the spovnet 155 // 156 // but first, we have to establish a link to the initiator... 157 // 158 159 initiatorLink = bc->establishLink( bootstrapEp ); 160 logging_info("join process initiated for " << id.toString() << "..."); 161 } 162 163 void BaseOverlay::leaveSpoVNet(){ 164 165 logging_info( "leaving spovnet " << spovnetId ); 166 bool ret = ( state != this->BaseOverlayStateInvalid ); 167 168 logging_debug( "dropping all auto-links ..." ); 169 170 // now we start leaving the spovnet: fist delete all links 171 // that we still have in the baseoverlay initiated by 172 // some services, the leave the actual overlay structure, 173 // then leave the spovnet 174 175 // --> drop all service links 176 177 vector<LinkID> servicelinks; 178 BOOST_FOREACH( LinkPair item, linkMapping ){ 179 if( item.second.service != OverlayInterface::OVERLAY_SERVICE_ID ) 180 servicelinks.push_back( item.first ); 181 } 182 BOOST_FOREACH( LinkID lnk, servicelinks ){ 183 // the dropLink function will remove 184 // the item from the linkMapping 185 dropLink( lnk ); 186 } 187 188 // --> leave overlay structure 189 190 logging_debug( "leaving overlay" ); 191 // first, leave the overlay interface 192 if( overlayInterface != NULL ) 193 overlayInterface->leaveOverlay(); 194 195 // --> leave spovnet 196 197 if( state != BaseOverlayStateInitiator ){ 198 199 // then, leave the spovnet baseoverlay 200 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeBye, nodeId ); 201 bc->sendMessage( initiatorLink, &overMsg ); 202 203 // drop the link and set to correct state 204 bc->dropLink( initiatorLink ); 205 initiatorLink = LinkID::UNSPECIFIED; 206 } 207 208 state = BaseOverlayStateInvalid; 209 ovl.visShutdown( ovlId, nodeId, string("") ); 210 211 // inform all registered services of the event 212 BOOST_FOREACH( NodeListener* i, nodeListeners ){ 213 if( ret ) i->onLeaveCompleted( spovnetId ); 214 else i->onLeaveFailed( spovnetId ); 215 } 379 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN ); 380 } 381 382 // ---------------------------------------------------------------------------- 383 384 const LinkID BaseOverlay::establishLink( 385 const EndpointDescriptor& ep, const NodeID& nodeid, 386 const ServiceID& service, const LinkID& linkid ) { 387 388 LinkID link_id = linkid; 389 390 // establish link via overlay 391 if (!nodeid.isUnspecified()) 392 link_id = establishLink( nodeid, service, link_id ); 393 394 // establish link directly if only ep is known 395 if (nodeid.isUnspecified()) 396 establishLink( ep, service, link_id ); 397 398 return link_id; 399 } 400 401 /// call base communication's establish link and add link mapping 402 const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep, 403 const ServiceID& service, const LinkID& linkid ) { 404 405 // create a new link id if necessary 406 LinkID link_id = linkid; 407 if (link_id.isUnspecified()) link_id = LinkID::create(); 408 409 /// find a service listener 410 if( !communicationListeners.contains( service ) ) { 411 logging_error( "No listener registered for service id=" << service.toString() ); 412 return LinkID::UNSPECIFIED; 413 } 414 CommunicationListener* listener = communicationListeners.get( service ); 415 assert( listener != NULL ); 416 417 /// establish link and add mapping 418 logging_info("Establishing direct link " << link_id.toString() 419 << " using " << ep.toString()); 420 421 // create descriptor 422 LinkDescriptor* ld = addDescriptor( link_id ); 423 ld->overlayId = link_id; 424 ld->communicationId = link_id; 425 ld->listener = listener; 426 ld->service = service; 427 bc->establishLink( ep, link_id ); 428 429 return link_id; 216 430 } 217 431 … … 220 434 const ServiceID& service, const LinkID& link_id ) { 221 435 222 if( !communicationListeners.contains( service ) ){ 223 logging_error( "no registered listener on serviceid " << service.toString() ); 224 return LinkID::UNSPECIFIED; 225 } 226 227 // copy link id 228 LinkID linkid = link_id; 229 230 // create link id if necessary 231 if (linkid.isUnspecified()) linkid = LinkID::create(); 436 // do not establish a link to myself! 437 if (node == nodeId) return LinkID::UNSPECIFIED; 438 439 // create a link descriptor 440 LinkDescriptor* ld = createLinkDescriptor( node, service, link_id ); 441 442 // create link request message with own link id 443 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL)); 444 LinkRequest link_request_msg( 445 nonce, &bc->getEndpointDescriptor(), false, 446 ld->overlayId, ld->localRelay ); 447 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId ); 448 overlay_msg.encapsulate( &link_request_msg ); 449 pendingLinks.insert( make_pair(nonce, ld->overlayId) ); 232 450 233 451 // debug message 234 logging_debug( "BaseOverlay called to establish link between node " << 235 node.toString() << " for service " << service.toString() ); 236 237 // create link request message with own link id 238 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId ); 239 uint32_t nonce = (uint32_t)(rand() ^ (rand() << 16) ^ time(NULL)); 240 LinkRequest link_request_msg( nonce, &bc->getEndpointDescriptor() ); 241 overlay_msg.encapsulate( &link_request_msg ); 242 pendingLinks.insert( make_pair(nonce, linkid) ); 243 244 // debug message 245 logging_debug( "BaseOverlay routes LinkRequest message to node " << node.toString() ); 246 247 // route message to overlay node 248 overlayInterface->routeMessage( node, &overlay_msg ); 249 250 CommunicationListener* receiver = communicationListeners.get( service ); 251 assert( receiver != NULL ); 252 253 LinkItem item (linkid, NodeID::UNSPECIFIED, service, receiver); 254 linkMapping.insert( make_pair(linkid, item) ); 255 256 return linkid; 257 } 258 259 const LinkID BaseOverlay::establishLink( const EndpointDescriptor& ep, 260 const ServiceID& service, const LinkID& linkid ){ 261 262 if( !communicationListeners.contains( service ) ){ 263 logging_error( "no registered listener on serviceid " << service.toString() ); 264 return LinkID::UNSPECIFIED; 265 } 266 267 const LinkID link = bc->establishLink( ep, linkid ); 268 269 CommunicationListener* receiver = communicationListeners.get( service ); 270 assert( receiver != NULL ); 271 272 LinkItem item (link, NodeID::UNSPECIFIED, service, receiver); 273 linkMapping.insert( make_pair(link, item) ); 274 275 return link; 276 } 277 278 void BaseOverlay::dropLink(const LinkID& link){ 279 280 logging_debug( "baseoverlay dropping link " << link.toString() ); 281 LinkMapping::iterator i = linkMapping.find( link ); 452 logging_debug( 453 "Sending link request with" 454 << " link id=" << ld->overlayId 455 << " node id=" << ld->remoteNode.toString() 456 << " service id=" << ld->service.toString() 457 << " local relay id=" << ld->localRelay.toString() 458 << " nonce= " << nonce 459 ); 460 461 // sending message through new link 462 sendMessage( &overlay_msg, ld ); 463 464 return ld->overlayId; 465 } 466 467 /// drops an established link 468 void BaseOverlay::dropLink(const LinkID& link) { 469 logging_debug( "Dropping link (initiated locally):" << link.toString() ); 282 470 283 471 // find the link item to drop 284 if( i == linkMapping.end() ){ 285 logging_warn( "can't drop link, mapping unknown " << link.toString() ); 472 LinkDescriptor* ld = getDescriptor(link); 473 if( ld == NULL ) { 474 logging_warn( "Can't drop link, link is unknown!"); 286 475 return; 287 476 } 288 477 289 LinkItem item = i->second;290 291 478 // delete all queued messages 292 if( item.waitingmsg.size() > 0 ){ 293 294 logging_warn( "dropping link " << link.toString() << 295 " that has " << item.waitingmsg.size() << " waiting messages" ); 296 297 item.deleteWaiting(); 298 } 299 300 // erase the mapping and drop the link 301 linkMapping.erase( i ); 302 bc->dropLink( link ); 303 304 // tell sideports and listeners of the drop 305 item.interface->onLinkDown( link, item.node ); 306 sideport->onLinkDown(link, this->nodeId, item.node, this->spovnetId ); 307 } 308 309 seqnum_t BaseOverlay::sendMessage(const Message* message, const LinkID& link ){ 310 311 logging_debug( "baseoverlay is sending data message on link " << link.toString() ); 312 313 // 479 if( ld->messageQueue.size() > 0 ) { 480 logging_warn( "Dropping link " << ld->overlayId.toString() << " that has " 481 << ld->messageQueue.size() << " waiting messages" ); 482 ld->flushQueue(); 483 } 484 485 // inform sideport and listener 486 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 487 sideport->onLinkDown(ld->overlayId, this->nodeId, ld->remoteNode, this->spovnetId ); 488 489 // do not drop relay links 490 if (!ld->usedAsRelay) { 491 // drop the link in base communication 492 if (ld->communicationUp) bc->dropLink( ld->communicationId ); 493 494 // erase descriptor 495 eraseDescriptor( ld->overlayId ); 496 } else 497 ld->dropWhenRelaysLeft = true; 498 } 499 500 // ---------------------------------------------------------------------------- 501 502 /// internal send message, always use this functions to send messages over links 503 seqnum_t BaseOverlay::sendMessage( const Message* message, const LinkID& link ) { 504 logging_debug( "Sending data message on link " << link.toString() ); 505 314 506 // get the mapping for this link 315 // 316 317 LinkMapping::iterator i = linkMapping.find( link ); 318 if( i == linkMapping.end() ){ 319 logging_error( "could not send message. link not found " << link.toString() ); 507 LinkDescriptor* ld = getDescriptor(link); 508 if( ld == NULL ) { 509 logging_error("Could not send message. " 510 << "Link not found id=" << link.toString()); 320 511 return -1; 321 512 } 322 513 323 i->second.markused(); 324 325 // 326 // check if the link is up yet, if its an autlink queue message 327 // 328 329 if( !i->second.linkup ){ 330 331 if( i->second.autolink ){ 332 logging_info( "auto link " << link.toString() << " is not up yet, queueing message" ); 514 // check if the link is up yet, if its an auto link queue message 515 if( !ld->up ) { 516 ld->markAsUsed(); 517 if( ld->autolink ) { 518 logging_info("Auto-link " << link.toString() << " not up, queue message"); 333 519 Data data = data_serialize( message ); 334 520 const_cast<Message*>(message)->dropPayload(); 335 i->second.waitingmsg.push_back( new Message(data) );521 ld->messageQueue.push_back( new Message(data) ); 336 522 } else { 337 logging_error("link " << link.toString() << " is not up yet, dropping message" ); 338 } 339 523 logging_error("Link " << link.toString() << " not up, drop message"); 524 } 340 525 return -1; 341 526 } 342 527 343 // 344 // send the message through the basecomm 345 // 346 347 OverlayMsg overmsg( OverlayMsg::OverlayMessageTypeData, i->second.service, nodeId ); 528 // compile overlay message (has service and node id) 529 OverlayMsg overmsg( OverlayMsg::typeData, ld->service, nodeId ); 348 530 overmsg.encapsulate( const_cast<Message*>(message) ); 349 531 350 return bc->sendMessage( link, &overmsg ); 351 } 352 353 seqnum_t BaseOverlay::sendMessage(const Message* message, const NodeID& node, const ServiceID& service){ 354 355 LinkID link = LinkID::UNSPECIFIED; 356 357 LinkMapping::iterator i = linkMapping.begin(); 358 LinkMapping::iterator iend = linkMapping.end(); 359 360 // 361 // see if we find a link for this node and service destination 362 // 363 364 for( ; i != iend; i++ ){ 365 if( i->second.node == node && i->second.service == service ){ 366 link = i->second.link; 367 break; 368 } 369 } 370 371 // 532 // send message over relay/direct/overlay 533 return sendMessage( &overmsg, ld ); 534 } 535 536 seqnum_t BaseOverlay::sendMessage(const Message* message, 537 const NodeID& node, const ServiceID& service) { 538 539 // find link for node and service 540 LinkDescriptor* ld = getAutoDescriptor( node, service ); 541 372 542 // if we found no link, create an auto link 373 // 374 375 if( link == LinkID::UNSPECIFIED ){ 376 377 logging_info( "no link could be found to send message to node " << 378 node.toString() << " for service " << service.toString() << 379 ". creating auto link ..."); 380 381 // call basecomm to create a link 382 link = establishLink( node, service ); 543 if( ld == NULL ) { 544 545 // debug output 546 logging_info( "No link to send message to node " 547 << node.toString() << " found for service " 548 << service.toString() << ". Creating auto link ..." 549 ); 383 550 384 551 // this will call onlinkup on us, if everything worked we now have a mapping 385 LinkMapping::iterator i = linkMapping.find( link ); 386 i->second.autolink = true; 387 388 if( i == linkMapping.end() || link == LinkID::UNSPECIFIED ){ 389 logging_error( "failed to establish auto link to node " << node.toString() << 390 " for service " << service.toString() ); 552 LinkID link = LinkID::create(); 553 554 // call base overlay to create a link 555 link = establishLink( node, service, link ); 556 ld = getDescriptor( link ); 557 if( ld == NULL ) { 558 logging_error( "Failed to establish auto-link."); 391 559 return -1; 392 560 } 393 394 logging_debug( "establishing autolink in progress to node " 395 << node.toString() << " with new link-id " << link.toString() ); 396 397 } // if( link != LinkID::UNSPECIFIED ) 398 399 assert( link != LinkID::UNSPECIFIED ); 400 401 // mark the link as used, as we 402 // now send a message through it 403 i->second.markused(); 404 405 // send the message through the new link. the link may not be functional, 406 // but for us there is a link-id so we can send messages through it. if 407 // the link is not yet up and the message needs to be cached, this is the 408 // task of the BaseCommunication, it will cache and send it later. 409 return sendMessage( message, link ); 410 } 411 412 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const LinkID& link) const { 413 414 return bc->getEndpointDescriptor( link ); 415 } 416 417 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor(const NodeID& node) const { 418 561 ld->autolink = true; 562 563 logging_debug( "Auto-link establishment in progress to node " 564 << node.toString() << " with link id=" << link.toString() ); 565 } 566 assert(ld != NULL); 567 568 // mark the link as used, as we now send a message through it 569 ld->markAsUsed(); 570 571 // send / queue message 572 return sendMessage( message, ld->overlayId ); 573 } 574 575 // ---------------------------------------------------------------------------- 576 577 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor( 578 const LinkID& link) const { 579 580 // return own end-point descriptor 581 if( link == LinkID::UNSPECIFIED ) 582 return bc->getEndpointDescriptor(); 583 584 // find link descriptor. not found -> return unspecified 585 const LinkDescriptor* ld = getDescriptor(link); 586 if (ld==NULL) return EndpointDescriptor::UNSPECIFIED; 587 588 // return endpoint-descriptor from base communication 589 return bc->getEndpointDescriptor( ld->communicationId ); 590 } 591 592 const EndpointDescriptor& BaseOverlay::getEndpointDescriptor( 593 const NodeID& node) const { 594 595 // return own end-point descriptor 419 596 if( node == nodeId || node == NodeID::UNSPECIFIED ) 420 597 return bc->getEndpointDescriptor(); 421 598 422 if( overlayInterface == NULL ){ 599 // no joined and request remote descriptor? -> fail! 600 if( overlayInterface == NULL ) { 423 601 logging_error( "overlay interface not set, cannot resolve endpoint" ); 424 602 return EndpointDescriptor::UNSPECIFIED; 425 603 } 426 604 427 // TODO: if this is not a onehop overlay the operation will go asynchronously605 // resolve end-point descriptor from the base-overlay routing table 428 606 return overlayInterface->resolveNode( node ); 429 607 } 430 608 431 432 bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid){ 609 // ---------------------------------------------------------------------------- 610 611 bool BaseOverlay::registerSidePort(SideportListener* _sideport) { 612 sideport = _sideport; 613 _sideport->configure( this ); 614 } 615 616 bool BaseOverlay::unregisterSidePort(SideportListener* _sideport) { 617 sideport = &SideportListener::DEFAULT; 618 } 619 620 // ---------------------------------------------------------------------------- 621 622 bool BaseOverlay::bind(CommunicationListener* listener, const ServiceID& sid) { 433 623 logging_debug( "binding communication listener " << listener 434 << " on serviceid " << sid.toString() );435 436 if( communicationListeners.contains( sid ) ) {624 << " on serviceid " << sid.toString() ); 625 626 if( communicationListeners.contains( sid ) ) { 437 627 logging_error( "some listener already registered for service id " 438 << sid.toString() );628 << sid.toString() ); 439 629 return false; 440 630 } … … 444 634 } 445 635 446 bool BaseOverlay::registerSidePort(SideportListener* _sideport){ 447 sideport = _sideport; 448 _sideport->configure( this ); 449 } 450 451 bool BaseOverlay::unregisterSidePort(SideportListener* _sideport){ 452 sideport = &SideportListener::DEFAULT; 453 } 454 455 bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid){ 456 logging_debug( "unbinding listener " << listener 457 << " from serviceid " << sid.toString() ); 458 459 if( !communicationListeners.contains( sid ) ){ 636 637 bool BaseOverlay::unbind(CommunicationListener* listener, const ServiceID& sid) { 638 logging_debug( "unbinding listener " << listener << " from serviceid " << sid.toString() ); 639 640 if( !communicationListeners.contains( sid ) ) { 460 641 logging_warn( "cannot unbind listener. no listener registered on service id " << sid.toString() ); 461 642 return false; 462 643 } 463 644 464 if( communicationListeners.get(sid) != listener ) {645 if( communicationListeners.get(sid) != listener ) { 465 646 logging_warn( "listener bound to service id " << sid.toString() 466 << " is different than listener trying to unbind" );647 << " is different than listener trying to unbind" ); 467 648 return false; 468 649 } … … 472 653 } 473 654 474 bool BaseOverlay::bind(NodeListener* listener){ 475 logging_debug( "binding node listener " << listener ); 476 477 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); 478 if( i != nodeListeners.end() ){ 479 logging_warn( "node listener " << listener << " is already bound, cannot bind" ); 655 // ---------------------------------------------------------------------------- 656 657 bool BaseOverlay::bind(NodeListener* listener) { 658 logging_debug( "Binding node listener " << listener ); 659 660 // already bound? yes-> warning 661 NodeListenerVector::iterator i = 662 find( nodeListeners.begin(), nodeListeners.end(), listener ); 663 if( i != nodeListeners.end() ) { 664 logging_warn("Node listener " << listener << " is already bound!" ); 480 665 return false; 481 666 } 482 667 668 // no-> add 483 669 nodeListeners.push_back( listener ); 484 670 return true; 485 671 } 486 672 487 bool BaseOverlay::unbind(NodeListener* listener){ 488 logging_debug( "unbinding node listener " << listener ); 489 673 bool BaseOverlay::unbind(NodeListener* listener) { 674 logging_debug( "Unbinding node listener " << listener ); 675 676 // already unbound? yes-> warning 490 677 NodeListenerVector::iterator i = find( nodeListeners.begin(), nodeListeners.end(), listener ); 491 if( i == nodeListeners.end() ) {492 logging_warn( " node listener " << listener << " is not bound, cannot unbind" );678 if( i == nodeListeners.end() ) { 679 logging_warn( "Node listener " << listener << " is not bound!" ); 493 680 return false; 494 681 } 495 682 683 // no-> remove 496 684 nodeListeners.erase( i ); 497 685 return true; 498 686 } 499 687 500 void BaseOverlay::onLinkUp(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){ 501 502 logging_debug( "base overlay received linkup event " + id.toString() ); 503 // TODO: updateOvlVis( getNodeID(id) ); 504 505 // 506 // if we get up a link while we are in the 507 // join phase and this is the link that 508 // we have initiated towards the spovnet owner 509 // continue the join process by sending 510 // a join request message through the link 511 // 512 513 if( state == BaseOverlayStateJoinInitiated && id == initiatorLink){ 514 688 // ---------------------------------------------------------------------------- 689 690 void BaseOverlay::onLinkUp(const LinkID& id, 691 const NetworkLocator* local, const NetworkLocator* remote) { 692 logging_debug( "Link up with base communication link id=" << id ); 693 694 // get descriptor for link 695 LinkDescriptor* ld = getDescriptor(id, true); 696 697 // handle initiator link 698 if(state == BaseOverlayStateJoinInitiated && id == initiatorLink) { 515 699 logging_info( 516 700 "Join has been initiated by me and the link is now up. " << 517 " sending out join request for SpoVNet " << spovnetId.toString()701 "Sending out join request for SpoVNet " << spovnetId.toString() 518 702 ); 519 703 520 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeJoinRequest, nodeId ); 521 JoinRequest joinmsg( spovnetId, nodeId ); 522 overMsg.encapsulate( &joinmsg ); 523 524 state = BaseOverlayStateJoinInitiated; // state remains in JoinInitiated 525 bc->sendMessage( id, &overMsg ); 526 704 // send join request message 705 OverlayMsg overlayMsg( OverlayMsg::typeJoinRequest, nodeId ); 706 JoinRequest joinRequest( spovnetId, nodeId ); 707 overlayMsg.encapsulate( &joinRequest ); 708 bc->sendMessage( id, &overlayMsg ); 527 709 return; 528 529 } // if( state == BaseOverlayStateJoinInitiated && id == initiatorLink) 530 531 // 532 // otherwise this is a link initiated by a service 533 // then we exchange update messages to exchange the 534 // service id and node id for the link. in this case 535 // we should have a link mapping for this link. if 536 // we have no link mapping this link was initiated by 537 // the remote side. 538 // 539 540 LinkMapping::iterator i = linkMapping.find( id ); 541 542 if( i == linkMapping.end() ){ 543 544 LinkItem item (id, NodeID::UNSPECIFIED, ServiceID::UNSPECIFIED, &CommunicationListener::DEFAULT ); 545 linkMapping.insert( make_pair(id, item) ); 546 710 } 711 712 // no link found? -> link establishment from remote, add one! 713 if (ld == NULL) { 714 ld = addDescriptor( id ); 715 logging_debug( "onLinkUp (remote request) descriptor: " << ld ); 716 717 // update descriptor 718 ld->fromRemote = true; 719 ld->communicationId = id; 720 ld->communicationUp = true; 721 ld->markAsUsed(); 722 723 // in this case, do not inform listener, since service it unknown 724 // -> wait for update message! 725 726 // link mapping found? -> send update message with node-id and service id 547 727 } else { 548 549 logging_debug( "sending out OverlayMessageTypeUpdate" << 550 " for service " << i->second.service.toString() << 551 " with local node id " << nodeId.toString() << 552 " on link " << id.toString() ); 553 554 OverlayMsg overMsg( 555 OverlayMsg::OverlayMessageTypeUpdate, 556 i->second.service, 557 nodeId 558 ); 559 560 bc->sendMessage( id, &overMsg ); 561 i->second.markused(); 562 563 } // if( i == linkMapping.end() ) 564 565 // the link is only valid for the service when we receive 566 // the OverlayMessageTypeUpdate from the remote node and 567 // have the nodeid and serviceid for the link! 568 } 569 570 void BaseOverlay::onLinkDown(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){ 571 572 logging_debug( "link went down " << id.toString() ); 573 574 // 575 // tell the service that the link went 576 // down and remove the mapping 577 // 578 579 LinkMapping::iterator i = linkMapping.find( id ); 580 if( i == linkMapping.end() ) { 581 // this can also be one of the baseoverlay links that 582 // no mapping is stored for. therefore we issue no warning. 583 // it can also be a link that has been dropped and the 584 // mapping is already deleted in the dropLink function. 585 // also, the service notification is issued then in dropLink 586 return; 587 } 588 589 i->second.interface->onLinkDown( id, i->second.node ); 590 sideport->onLinkDown( id, this->nodeId, i->second.node, this->spovnetId ); 591 592 // delete all queued messages 593 if( i->second.waitingmsg.size() > 0 ){ 594 595 logging_warn( "dropping link " << id.toString() << 596 " that has " << i->second.waitingmsg.size() << " waiting messages" ); 597 598 i->second.deleteWaiting(); 599 } 600 601 linkMapping.erase( i ); 602 } 603 604 void BaseOverlay::onLinkChanged(const LinkID& id, const NetworkLocator* oldlocal, const NetworkLocator* newlocal, const NetworkLocator* oldremote, const NetworkLocator* newremote){ 605 606 logging_debug( "link changed " << id.toString() ); 607 608 // 609 // tell the service that the link changed 610 // 611 612 LinkMapping::iterator i = linkMapping.find( id ); 613 if( i == linkMapping.end() ) return; 614 615 i->second.interface->onLinkChanged( id, i->second.node ); 616 sideport->onLinkChanged( id, this->nodeId, i->second.node, this->spovnetId ); 617 618 // TODO call onLinkQoSChanged? 619 620 i->second.markused(); 621 } 622 623 void BaseOverlay::onLinkFail(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote){ 624 625 logging_debug( "link failed " << id.toString() ); 626 627 // 628 // tell the service that the link failed 629 // 630 631 LinkMapping::iterator i = linkMapping.find( id ); 632 if( i == linkMapping.end() ) return; 633 634 i->second.interface->onLinkFail( id, i->second.node ); 635 sideport->onLinkFail( id, this->nodeId, i->second.node, this->spovnetId ); 636 637 i->second.markused(); 638 } 639 640 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote, const QoSParameterSet& qos) { 641 642 logging_debug( "link qos changed " << id.toString() ); 643 644 // 645 // tell the service that the link qos has changed 646 // 647 648 LinkMapping::iterator i = linkMapping.find( id ); 649 if( i == linkMapping.end() ) return; 650 651 // TODO: convert QoSParameterSet to the LinkProperties properties 652 // TODO: currently not in the interface: i->second.interface->onLinkQoSChanged( id, i->second.node, LinkProperties::DEFAULT ); 653 654 i->second.markused(); 655 } 656 657 bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local, const NetworkLocator* remote ){ 658 659 // also see in the receiveMessage function. there the higher layer service 660 // is asked whether to accept link requests, but there a basic link association is 661 // already built up, so we know the node id 662 logging_debug("received link request from " << remote->toString() << ", accepting"); 728 logging_debug( "onLinkUp descriptor (initiated locally):" << ld ); 729 730 // note: necessary to validate the link on the remote side! 731 logging_debug( "Sending out update" << 732 " for service " << ld->service.toString() << 733 " with local node id " << nodeId.toString() << 734 " on link " << ld->overlayId.toString() ); 735 736 // update descriptor 737 ld->markAsUsed(); 738 ld->communicationUp = true; 739 740 // if link is a relayed link ->convert to direct link 741 if (ld->relay) { 742 ld->up = true; 743 ld->relay = false; 744 ld->localRelay = NodeID::UNSPECIFIED; 745 OverlayMsg overMsg( OverlayMsg::typeDirectLink, ld->service, nodeId ); 746 overMsg.setRelayLink( ld->remoteLinkId ); 747 bc->sendMessage( ld->communicationId, &overMsg ); 748 } 749 750 // compile and send update message 751 OverlayMsg overlayMsg( OverlayMsg::typeUpdate, ld->service, nodeId ); 752 overlayMsg.setAutoLink( ld->autolink ); 753 bc->sendMessage( ld->communicationId, &overlayMsg ); 754 } 755 } 756 757 void BaseOverlay::onLinkDown(const LinkID& id, 758 const NetworkLocator* local, const NetworkLocator* remote) { 759 760 // get descriptor for link 761 LinkDescriptor* ld = getDescriptor(id, true); 762 if ( ld == NULL ) return; // not found? ->ignore! 763 logging_force( "onLinkDown descriptor: " << ld ); 764 765 // inform listeners about link down 766 ld->communicationUp = false; 767 ld->listener->onLinkDown( ld->overlayId, ld->remoteNode ); 768 sideport->onLinkDown( id, this->nodeId, ld->remoteNode, this->spovnetId ); 769 770 // delete all queued messages (auto links) 771 if( ld->messageQueue.size() > 0 ) { 772 logging_warn( "Dropping link " << id.toString() << " that has " 773 << ld->messageQueue.size() << " waiting messages" ); 774 ld->flushQueue(); 775 } 776 777 // erase mapping 778 eraseDescriptor(ld->overlayId); 779 } 780 781 void BaseOverlay::onLinkChanged(const LinkID& id, 782 const NetworkLocator* oldlocal, const NetworkLocator* newlocal, 783 const NetworkLocator* oldremote, const NetworkLocator* newremote) { 784 785 // get descriptor for link 786 LinkDescriptor* ld = getDescriptor(id, true); 787 if ( ld == NULL ) return; // not found? ->ignore! 788 logging_debug( "onLinkChanged descriptor: " << ld ); 789 790 // inform listeners 791 ld->listener->onLinkChanged( ld->overlayId, ld->remoteNode ); 792 sideport->onLinkChanged( id, this->nodeId, ld->remoteNode, this->spovnetId ); 793 794 // autolinks: refresh timestamp 795 ld->markAsUsed(); 796 } 797 798 void BaseOverlay::onLinkFail(const LinkID& id, 799 const NetworkLocator* local, const NetworkLocator* remote) { 800 logging_debug( "Link fail with base communication link id=" << id ); 801 802 // get descriptor for link 803 LinkDescriptor* ld = getDescriptor(id, true); 804 if ( ld == NULL ) return; // not found? ->ignore! 805 logging_debug( "Link failed id=" << ld->overlayId.toString() ); 806 807 // inform listeners 808 ld->listener->onLinkFail( ld->overlayId, ld->remoteNode ); 809 sideport->onLinkFail( id, this->nodeId, ld->remoteNode, this->spovnetId ); 810 811 // autolinks: refresh timestamp 812 ld->markAsUsed(); 813 } 814 815 void BaseOverlay::onLinkQoSChanged(const LinkID& id, const NetworkLocator* local, 816 const NetworkLocator* remote, const QoSParameterSet& qos) { 817 logging_debug( "Link quality changed with base communication link id=" << id ); 818 819 // get descriptor for link 820 LinkDescriptor* ld = getDescriptor(id, true); 821 if ( ld == NULL ) return; // not found? ->ignore! 822 logging_debug( "Link quality changed id=" << ld->overlayId.toString() ); 823 824 // autolinks: refresh timestamp 825 ld->markAsUsed(); 826 } 827 828 bool BaseOverlay::onLinkRequest( const LinkID& id, const NetworkLocator* local, 829 const NetworkLocator* remote ) { 830 logging_debug("Accepting link request from " << remote->toString() ); 663 831 return true; 664 832 } 665 833 666 834 /// handles a message from base communication 667 835 bool BaseOverlay::receiveMessage(const Message* message, 668 const LinkID& link, const NodeID& 669 /*the nodeid is invalid in this case! removed var to prevent errors*/ ){ 836 const LinkID& link, const NodeID& ) { 837 // get descriptor for link 838 LinkDescriptor* ld = getDescriptor( link, true ); 839 840 // link known? 841 if (ld == NULL) { // no-> handle with unspecified params 842 logging_debug("Received message from base communication, link descriptor unknown" ); 843 return handleMessage( message, LinkID::UNSPECIFIED, link, NodeID::UNSPECIFIED ); 844 } else { // yes -> handle with overlay link id 845 logging_debug("Received message from base communication, link id=" << ld->overlayId.toString() ); 846 return handleMessage( message, ld->overlayId, link, NodeID::UNSPECIFIED ); 847 } 848 } 849 850 // ---------------------------------------------------------------------------- 851 852 /// handles a message from an overlay 853 void BaseOverlay::incomingRouteMessage( Message* msg, const LinkID& link, const NodeID& source ) { 854 logging_debug("Received message from overlay -- " 855 << " link id=" << link.toString() 856 << " node id=" << source.toString() ); 857 handleMessage( msg, link, LinkID::UNSPECIFIED, source ); 858 } 859 860 // ---------------------------------------------------------------------------- 861 862 /// handles an incoming message 863 bool BaseOverlay::handleMessage( const Message* message, 864 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ) { 865 logging_debug( "Handling message: " << message->toString()); 670 866 671 867 // decapsulate overlay message 672 logging_debug( "receiveMessage: " << message->toString());673 OverlayMsg* overlayMsg =const_cast<Message*>(message)->decapsulate<OverlayMsg>();868 OverlayMsg* overlayMsg = 869 const_cast<Message*>(message)->decapsulate<OverlayMsg>(); 674 870 if( overlayMsg == NULL ) return false; 675 871 676 872 // mark the link as in action 677 LinkMapping::iterator item = linkMapping.find( link ); 678 if( item != linkMapping.end() ) item->second.markused(); 679 680 /* ************************************************************************ 681 /* handle user date that we forward to the appropriate service using the 682 * service id in the message. as we don't know the class of message that 683 * the service handles, we forward it as a pure Message 684 */ 685 if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) { 686 687 logging_debug( "baseoverlay received message of type OverlayMessageTypeData" ); 688 689 const ServiceID& service = overlayMsg->getService(); 690 CommunicationListener* serviceListener = communicationListeners.get( service ); 691 692 logging_debug( "received data for service " << service.toString() ); 693 694 if( serviceListener != NULL ) 695 serviceListener->onMessage( overlayMsg, overlayMsg->getSourceNode(), link ); 696 697 return true; 698 699 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeData) ) 700 701 /* ************************************************************************ 702 /* Handle spovnet instance join requests 703 */ 704 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest) ){ 705 706 logging_debug( 707 "baseoverlay received message of type OverlayMessageTypeJoinRequest" 708 ); 709 710 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>(); 711 logging_info( "received join request for spovnet " << 712 joinReq->getSpoVNetID().toString() ); 713 714 /* make sure that the node actually wants to join 715 * the correct spovnet id that we administrate */ 716 if( joinReq->getSpoVNetID() != spovnetId ){ 717 logging_error( "received join request for spovnet we don't handle " << 873 LinkDescriptor* ld = getDescriptor(boLink); 874 if (ld == NULL) ld = getDescriptor(bcLink, true); 875 if (ld != NULL) { 876 ld->markAsUsed(); 877 ld->markAlive(); 878 } 879 880 switch ( overlayMsg->getType() ) { 881 // --------------------------------------------------------------------- 882 // Handle spovnet instance join requests 883 // --------------------------------------------------------------------- 884 case OverlayMsg::typeJoinRequest: { 885 886 // decapsulate message 887 JoinRequest* joinReq = overlayMsg->decapsulate<JoinRequest>(); 888 logging_info( "Received join request for spovnet " << 718 889 joinReq->getSpoVNetID().toString() ); 719 return false; 720 } 721 722 // 723 // only if all services allow the node to join it is allowed 724 // using the isJoinAllowed interface security policies can be 725 // implemented by higher layer services 726 // 727 728 // TODO: here you can implement mechanisms to deny joining of a node 729 bool allow = true; 730 731 logging_info( "sending back join reply for spovnet " << 732 spovnetId.toString() << " to node " << 733 overlayMsg->getSourceNode().toString() << 734 ". result: " << (allow ? "allowed" : "denied") ); 735 736 joiningNodes.push_back( overlayMsg->getSourceNode() ); 737 738 // 739 // send back our spovnetid, default overlay parameters, join allow 740 // result, and ourself as the end-point to bootstrap the overlay against 741 // 742 743 assert( overlayInterface != NULL ); 744 OverlayParameterSet parameters = overlayInterface->getParameters(); 745 746 OverlayMsg retmsg( OverlayMsg::OverlayMessageTypeJoinReply, nodeId ); 747 JoinReply replyMsg( spovnetId, parameters, 890 891 // check spovnet id 892 if( joinReq->getSpoVNetID() != spovnetId ) { 893 logging_error( 894 "Received join request for spovnet we don't handle " << 895 joinReq->getSpoVNetID().toString() ); 896 return false; 897 } 898 899 // TODO: here you can implement mechanisms to deny joining of a node 900 bool allow = true; 901 logging_info( "Sending join reply for spovnet " << 902 spovnetId.toString() << " to node " << 903 overlayMsg->getSourceNode().toString() << 904 ". Result: " << (allow ? "allowed" : "denied") ); 905 joiningNodes.push_back( overlayMsg->getSourceNode() ); 906 907 // return overlay parameters 908 assert( overlayInterface != NULL ); 909 logging_debug( "Using bootstrap end-point " 910 << getEndpointDescriptor().toString() ) 911 OverlayParameterSet parameters = overlayInterface->getParameters(); 912 OverlayMsg retmsg( OverlayMsg::typeJoinReply, nodeId ); 913 JoinReply replyMsg( spovnetId, parameters, 748 914 allow, getEndpointDescriptor() ); 749 750 retmsg.encapsulate(&replyMsg); 751 bc->sendMessage( link, &retmsg ); 752 753 return true; 754 755 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinRequest)) 756 757 /* ************************************************************************ 758 * handle replies to spovnet instance join requests 759 */ 760 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && 761 state == BaseOverlayStateJoinInitiated){ 762 763 logging_debug( 764 "baseoverlay received message of type OverlayMessageTypeJoinReply"); 765 766 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>(); 767 logging_info( "received spovnet join reply" ); 768 769 // ensure that we actually wanted to get into the spovnet whose id is 770 // in the message 771 if( replyMsg->getSpoVNetID() != spovnetId ){ 772 logging_error( "received spovnet join reply for spovnet " << 773 replyMsg->getSpoVNetID().toString() << 774 " but we wanted to join spovnet " << 915 retmsg.encapsulate(&replyMsg); 916 bc->sendMessage( bcLink, &retmsg ); 917 return true; 918 } 919 920 // --------------------------------------------------------------------- 921 // handle replies to spovnet instance join requests 922 // --------------------------------------------------------------------- 923 case OverlayMsg::typeJoinReply: { 924 925 // decapsulate message 926 logging_debug("received join reply message"); 927 JoinReply* replyMsg = overlayMsg->decapsulate<JoinReply>(); 928 assert(state == BaseOverlayStateJoinInitiated); 929 930 // correct spovnet? 931 if( replyMsg->getSpoVNetID() != spovnetId ) { // no-> fail 932 logging_error( "Received SpoVNet join reply for " << 933 replyMsg->getSpoVNetID().toString() << 934 " != " << spovnetId.toString() ); 935 return false; 936 } 937 938 // access granted? no -> fail 939 if( !replyMsg->getJoinAllowed() ) { 940 logging_error( "Our join request has been denied" ); 941 942 // drop initiator link 943 bc->dropLink( initiatorLink ); 944 initiatorLink = LinkID::UNSPECIFIED; 945 state = BaseOverlayStateInvalid; 946 947 // inform all registered services of the event 948 BOOST_FOREACH( NodeListener* i, nodeListeners ) 949 i->onJoinFailed( spovnetId ); 950 return true; 951 } 952 953 // access has been granted -> continue! 954 logging_info("Join request has been accepted for spovnet " << 775 955 spovnetId.toString() ); 776 956 777 // state does not change here, maybe the reply does come in later 778 return false; 779 } 780 781 // if we did not get access to the spovnet notify of the failure and 782 // close the link to the initiator 783 if( ! replyMsg->getJoinAllowed() ){ 784 785 logging_error( "our join request has been denied" ); 786 787 bc->dropLink( initiatorLink ); 788 initiatorLink = LinkID::UNSPECIFIED; 789 state = BaseOverlayStateInvalid; 790 791 // inform all registered services of the event 792 BOOST_FOREACH( NodeListener* i, nodeListeners ){ 957 // create overlay structure from spovnet parameter set 958 overlayInterface = OverlayFactory::create( 959 *this, replyMsg->getParam(), nodeId, this ); 960 961 // overlay structure supported? no-> fail! 962 if( overlayInterface == NULL ) { 963 logging_error( "overlay structure not supported" ); 964 965 bc->dropLink( initiatorLink ); 966 initiatorLink = LinkID::UNSPECIFIED; 967 state = BaseOverlayStateInvalid; 968 969 // inform all registered services of the event 970 BOOST_FOREACH( NodeListener* i, nodeListeners ) 793 971 i->onJoinFailed( spovnetId ); 972 973 return true; 794 974 } 795 975 796 return true; 797 } 798 799 logging_info( "join request has been accepted for spovnet " << 800 spovnetId.toString() ); 801 802 // if we did get access to the spovnet we try to create the overlay 803 // structure as given in the reply message 804 overlayInterface = OverlayFactory::create( *this, 805 replyMsg->getParam(), nodeId, this ); 806 807 if( overlayInterface == NULL ){ 808 logging_error( "overlay structure not supported" ); 809 810 bc->dropLink( initiatorLink ); 811 initiatorLink = LinkID::UNSPECIFIED; 812 state = BaseOverlayStateInvalid; 976 // everything ok-> join the overlay! 977 state = BaseOverlayStateCompleted; 978 overlayInterface->createOverlay(); 979 logging_debug( "Using bootstrap end-point " 980 << replyMsg->getBootstrapEndpoint().toString() ); 981 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 982 983 // update ovlvis 984 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); 813 985 814 986 // inform all registered services of the event 815 987 BOOST_FOREACH( NodeListener* i, nodeListeners ) 816 i->onJoin Failed( spovnetId );988 i->onJoinCompleted( spovnetId ); 817 989 818 990 return true; 819 991 } 820 992 821 /* now start the join process for the overlay. the join process for the 822 * spovnet baseoverlay is now complete. we use the endpoint for overlay 823 * structure bootstrapping that the initiator provided in his reply 824 * message */ 825 state = BaseOverlayStateCompleted; 826 ovl.visChangeNodeColor( ovlId, nodeId, OvlVis::NODE_COLORS_GREEN); 827 828 overlayInterface->createOverlay(); 829 overlayInterface->joinOverlay( replyMsg->getBootstrapEndpoint() ); 830 831 // inform all registered services of the event 832 BOOST_FOREACH( NodeListener* i, nodeListeners ){ 833 i->onJoinCompleted( spovnetId ); 834 } 835 836 return true; 837 838 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeJoinReply) && state == BaseOverlayStateJoinInitiated) 839 840 841 /* ************************************************************************ 842 * handle update messages for link establishment 843 */ 844 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ){ 845 846 logging_debug( 847 "baseoverlay received message of type OverlayMessageTypeUpdate" 848 ); 849 850 const NodeID& sourcenode = overlayMsg->getSourceNode(); 851 const ServiceID& service = overlayMsg->getService(); 852 853 // linkmapping for the link available? no-> ignore 854 LinkMapping::iterator i = linkMapping.find( link ); 855 if( i == linkMapping.end() ) { 856 logging_warn( "received overlay update message for link " << 857 link.toString() << " for which we have no mapping" ); 858 return false; 859 } 860 861 // update our link mapping information for this link 862 bool changed = ( i->second.node != sourcenode ) || ( i->second.service != service ); 863 i->second.node = sourcenode; 864 i->second.service = service; 865 866 // if our link information changed, we send out an update, too 867 if( changed ){ 868 OverlayMsg overMsg( OverlayMsg::OverlayMessageTypeUpdate, i->second.service, nodeId ); 869 bc->sendMessage( link, &overMsg ); 870 } 871 872 // set the correct listener service for the linkitem 873 // now we can tell the registered service of the linkup event 874 if( !communicationListeners.contains( service ) ){ 875 logging_warn( "linkup event for service that has not been registered" ); 876 return false; 877 } 878 879 CommunicationListener* iface = communicationListeners.get( service ); 880 if( iface == NULL || iface == &CommunicationListener::DEFAULT ){ 881 logging_warn( "linkup event for service that has been registered " 882 "with a NULL interface" ); 993 // --------------------------------------------------------------------- 994 // handle data forward messages 995 // --------------------------------------------------------------------- 996 case OverlayMsg::typeData: { 997 998 // get service 999 const ServiceID& service = overlayMsg->getService(); 1000 logging_debug( "received data for service " << service.toString() ); 1001 1002 // find listener 1003 CommunicationListener* listener = 1004 communicationListeners.get( service ); 1005 if( listener == NULL ) return true; 1006 1007 // delegate data message 1008 listener->onMessage( overlayMsg, 1009 overlayMsg->getSourceNode(), ld->overlayId ); 1010 883 1011 return true; 884 1012 } 885 1013 886 i->second.interface = iface; 887 i->second.markused(); 888 889 // ask the service whether it wants to accept this link 890 if( !iface->onLinkRequest(sourcenode) ){ 891 892 logging_debug("link " << link.toString() << 893 " has been denied by service " << service.toString() << ", dropping link"); 894 895 // prevent onLinkDown calls to the service 896 i->second.interface = &CommunicationListener::DEFAULT; 897 // drop the link 898 dropLink( link ); 899 900 return true; 901 } 902 903 // 904 // link has been accepted, link is now up, send messages out first 905 // 906 907 i->second.linkup = true; 908 logging_debug("link " << link.toString() << 909 " has been accepted by service " << service.toString() << " and is now up"); 910 911 if( i->second.waitingmsg.size() > 0 ){ 912 logging_info( "sending out queued messages on link " << link.toString() ); 913 914 BOOST_FOREACH( Message* msg, i->second.waitingmsg ){ 915 sendMessage( msg, link ); 916 delete msg; 1014 // --------------------------------------------------------------------- 1015 // handle update messages for link establishment 1016 // --------------------------------------------------------------------- 1017 case OverlayMsg::typeUpdate: { 1018 logging_debug("Received type update message on link " << ld ); 1019 1020 // get info 1021 const NodeID& sourcenode = overlayMsg->getSourceNode(); 1022 const ServiceID& service = overlayMsg->getService(); 1023 1024 // no link descriptor available -> error! 1025 if( ld == NULL ) { 1026 logging_warn( "received overlay update message for link " << 1027 ld->overlayId.toString() << " for which we have no mapping" ); 1028 return false; 917 1029 } 918 1030 919 i->second.waitingmsg.clear(); 920 } 921 922 // call the notification functions 923 iface->onLinkUp( link, sourcenode ); 924 sideport->onLinkUp( link, nodeId, sourcenode, this->spovnetId ); 925 926 return true; 927 928 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeUpdate) ) 929 930 /* ************************************************************************ 931 * handle bye messages 932 */ 933 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye) ) { 934 935 logging_debug( "BaseOverlay received message of type OverlayMessageTypeBye" ); 936 logging_debug( "Received bye message from " << 937 overlayMsg->getSourceNode().toString() ); 938 939 /* if we are the initiator and receive a bye from a node 940 * the node just left. if we are a node and receive a bye 941 * from the initiator, we have to close, too. 942 */ 943 if( overlayMsg->getSourceNode() == spovnetInitiator ){ 944 945 bc->dropLink( initiatorLink ); 946 initiatorLink = LinkID::UNSPECIFIED; 947 state = BaseOverlayStateInvalid; 948 949 logging_fatal( "initiator ended spovnet" ); 950 951 // inform all registered services of the event 952 BOOST_FOREACH( NodeListener* i, nodeListeners ){ 953 i->onLeaveFailed( spovnetId ); 1031 // update our link mapping information for this link 1032 bool changed = 1033 ( ld->remoteNode != sourcenode ) || ( ld->service != service ); 1034 ld->remoteNode = sourcenode; 1035 ld->service = service; 1036 ld->autolink = overlayMsg->isAutoLink(); 1037 1038 // if our link information changed, we send out an update, too 1039 if( changed ) { 1040 OverlayMsg overMsg( OverlayMsg::typeUpdate, ld->service, nodeId ); 1041 overMsg.setAutoLink(ld->autolink); 1042 bc->sendMessage( ld->communicationId, &overMsg ); 954 1043 } 955 1044 956 } else { 957 // a node that said goodbye and we are the initiator don't have to 958 // do much here, as the node also will go out of the overlay 959 // structure 960 logging_info( "node left " << overlayMsg->getSourceNode() ); 961 } 962 963 return true; 964 965 } // else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeBye)) 966 967 /* ************************************************************************ 968 * handle link request forwarded through the overlay 969 */ 970 else if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) { 971 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>(); 972 const ServiceID& service = overlayMsg->getService(); 973 if (linkReq->isReply()) { 974 975 // find link 976 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() ); 977 if ( i == pendingLinks.end() ) { 978 logging_error( "Nonce not found in link request" ); 1045 // service registered? no-> error! 1046 if( !communicationListeners.contains( service ) ) { 1047 logging_warn( "Link up: event listener has not been registered" ); 1048 return false; 1049 } 1050 1051 // default or no service registered? 1052 CommunicationListener* listener = communicationListeners.get( service ); 1053 if( listener == NULL || listener == &CommunicationListener::DEFAULT ) { 1054 logging_warn("Link up: event listener is default or null!" ); 979 1055 return true; 980 1056 } 981 1057 982 // debug message 983 logging_debug( "LinkRequest reply received. Establishing link " 984 << i->second << " to " << (linkReq->getEndpoint()->toString()) 985 << " for service " << service.toString() 986 << " with nonce " << linkReq->getNonce() 1058 // update descriptor 1059 ld->listener = listener; 1060 ld->markAsUsed(); 1061 ld->markAlive(); 1062 1063 // ask the service whether it wants to accept this link 1064 if( !listener->onLinkRequest(sourcenode) ) { 1065 1066 logging_debug("Link id=" << ld->overlayId.toString() << 1067 " has been denied by service " << service.toString() << ", dropping link"); 1068 1069 // prevent onLinkDown calls to the service 1070 ld->listener = &CommunicationListener::DEFAULT; 1071 1072 // drop the link 1073 dropLink( ld->overlayId ); 1074 return true; 1075 } 1076 1077 // set link up 1078 ld->up = true; 1079 logging_debug( 1080 "Link " << ld->overlayId.toString() 1081 << " has been accepted by service " << service.toString() 1082 << " and is now up" 987 1083 ); 988 1084 989 // establishing link 990 bc->establishLink( *linkReq->getEndpoint(), i->second ); 991 } else { 992 OverlayMsg overlay_msg( OverlayMsg::OverlayMessageTypeLinkRequest, service, nodeId ); 993 LinkRequest link_request_msg( 994 linkReq->getNonce(), &bc->getEndpointDescriptor(), true ); 995 overlay_msg.encapsulate( &link_request_msg ); 996 997 // debug message 998 logging_debug( "Sending LinkRequest reply for link with nonce " << 999 linkReq->getNonce() ); 1000 1001 // route message back over overlay 1002 overlayInterface->routeMessage( 1003 overlayMsg->getSourceNode(), &overlay_msg 1004 ); 1005 } 1006 } // if( overlayMsg->isType(OverlayMsg::OverlayMessageTypeLinkRequest)) 1007 1008 /* ************************************************************************ 1009 * unknown message type ... error! 1010 */ 1011 else { 1012 1013 logging_error( "received message in invalid state! don't know " << 1014 "what to do with this message of type " << 1015 overlayMsg->getType() ); 1016 return false; 1017 1018 } // else 1019 1085 // auto links: link has been accepted -> send queued messages 1086 if( ld->messageQueue.size() > 0 ) { 1087 logging_info( "sending out queued messages on link " << 1088 ld->overlayId.toString() ); 1089 BOOST_FOREACH( Message* msg, ld->messageQueue ) { 1090 sendMessage( msg, ld->overlayId ); 1091 delete msg; 1092 } 1093 ld->messageQueue.clear(); 1094 } 1095 1096 // call the notification functions 1097 listener->onLinkUp( ld->overlayId, sourcenode ); 1098 sideport->onLinkUp( ld->overlayId, nodeId, sourcenode, this->spovnetId ); 1099 1100 return true; 1101 } 1102 1103 // --------------------------------------------------------------------- 1104 // handle bye messages 1105 // --------------------------------------------------------------------- 1106 case OverlayMsg::typeBye: { 1107 logging_debug( "received bye message from " << 1108 overlayMsg->getSourceNode().toString() ); 1109 1110 /* if we are the initiator and receive a bye from a node 1111 * the node just left. if we are a node and receive a bye 1112 * from the initiator, we have to close, too. 1113 */ 1114 if( overlayMsg->getSourceNode() == spovnetInitiator ) { 1115 1116 bc->dropLink( initiatorLink ); 1117 initiatorLink = LinkID::UNSPECIFIED; 1118 state = BaseOverlayStateInvalid; 1119 1120 logging_fatal( "initiator ended spovnet" ); 1121 1122 // inform all registered services of the event 1123 BOOST_FOREACH( NodeListener* i, nodeListeners ) 1124 i->onLeaveFailed( spovnetId ); 1125 1126 } else { 1127 // a node that said goodbye and we are the initiator don't have to 1128 // do much here, as the node also will go out of the overlay 1129 // structure 1130 logging_info( "node left " << overlayMsg->getSourceNode() ); 1131 } 1132 1133 return true; 1134 1135 } 1136 1137 // --------------------------------------------------------------------- 1138 // handle link request forwarded through the overlay 1139 // --------------------------------------------------------------------- 1140 case OverlayMsg::typeLinkRequest: { 1141 1142 // decapsulate message 1143 LinkRequest* linkReq = overlayMsg->decapsulate<LinkRequest>(); 1144 const ServiceID& service = overlayMsg->getService(); 1145 1146 // is request reply? 1147 if ( linkReq->isReply() ) { 1148 1149 // find link 1150 PendingLinkMap::iterator i = pendingLinks.find( linkReq->getNonce() ); 1151 if ( i == pendingLinks.end() ) { 1152 logging_error( "Nonce not found in link request" ); 1153 return true; 1154 } 1155 1156 // debug message 1157 logging_debug( "Link request reply received. Establishing link " 1158 << i->second << " to " << (linkReq->getEndpoint()->toString()) 1159 << " for service " << service.toString() 1160 << " with nonce " << linkReq->getNonce() 1161 << " using relay " << linkReq->getRelay().toString() 1162 << " and remote link id=" << linkReq->getRemoteLinkId() 1163 ); 1164 1165 // get descriptor 1166 LinkDescriptor* ldn = getDescriptor(i->second); 1167 1168 // check if link request reply has a relay node ... 1169 if (!linkReq->getRelay().isUnspecified()) { // yes-> 1170 ldn->up = true; 1171 ldn->relay = true; 1172 if (ldn->localRelay.isUnspecified()) { 1173 logging_error("On LinkRequest reply: local relay is unspecifed on link " << ldn ); 1174 showLinkState(); 1175 } 1176 ldn->remoteRelay = linkReq->getRelay(); 1177 ldn->remoteLinkId = linkReq->getRemoteLinkId(); 1178 ldn->remoteNode = overlayMsg->getSourceNode(); 1179 1180 ldn->markAlive(); 1181 1182 // compile and send update message 1183 OverlayMsg _overlayMsg( OverlayMsg::typeUpdate, ldn->service, nodeId ); 1184 _overlayMsg.setAutoLink(ldn->autolink); 1185 sendMessage( &_overlayMsg, ldn ); 1186 1187 // auto links: link has been accepted -> send queued messages 1188 if( ldn->messageQueue.size() > 0 ) { 1189 logging_info( "Sending out queued messages on link " << 1190 ldn->overlayId.toString() ); 1191 BOOST_FOREACH( Message* msg, ldn->messageQueue ) { 1192 sendMessage( msg, ldn->overlayId ); 1193 delete msg; 1194 } 1195 ldn->messageQueue.clear(); 1196 } 1197 1198 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); 1199 1200 // try to establish a direct link 1201 ldn->communicationId = 1202 bc->establishLink( *linkReq->getEndpoint(), i->second ); 1203 } 1204 1205 // no relay node-> use overlay routing 1206 else { 1207 ldn->up = true; 1208 1209 // establish direct link 1210 ldn->communicationId = 1211 bc->establishLink( *linkReq->getEndpoint(), i->second ); 1212 } 1213 } else { 1214 logging_debug( "Link request received from node id=" 1215 << overlayMsg->getSourceNode() ); 1216 1217 // create link descriptor 1218 LinkDescriptor* ldn = 1219 createLinkDescriptor(overlayMsg->getSourceNode(), 1220 overlayMsg->getService(), LinkID::UNSPECIFIED ); 1221 assert(!ldn->overlayId.isUnspecified()); 1222 1223 // create reply message 1224 OverlayMsg overlay_msg( OverlayMsg::typeLinkRequest, service, nodeId ); 1225 LinkRequest link_request_msg( 1226 linkReq->getNonce(), 1227 &bc->getEndpointDescriptor(), 1228 true, ldn->overlayId, ldn->localRelay 1229 ); 1230 overlay_msg.encapsulate( &link_request_msg ); 1231 1232 // debug message 1233 logging_debug( "Sending LinkRequest reply for link with nonce " << 1234 linkReq->getNonce() ); 1235 1236 // if this is a relay link-> update information & inform listeners 1237 if (!linkReq->getRelay().isUnspecified()) { 1238 // set flags 1239 ldn->up = true; 1240 ldn->relay = true; 1241 if (ldn->localRelay.isUnspecified()) { 1242 logging_error("On LinkRequest request: local relay is unspecifed on link " << ldn ); 1243 showLinkState(); 1244 } 1245 ldn->remoteRelay = linkReq->getRelay(); 1246 ldn->remoteNode = overlayMsg->getSourceNode(); 1247 ldn->remoteLinkId = linkReq->getRemoteLinkId(); 1248 ldn->listener->onLinkUp( ldn->overlayId, ldn->remoteNode ); 1249 } 1250 1251 // route message back over overlay 1252 sendMessage( &overlay_msg, ldn ); 1253 } 1254 return true; 1255 } 1256 1257 // --------------------------------------------------------------------- 1258 // handle relay message to forward messages 1259 // --------------------------------------------------------------------- 1260 case OverlayMsg::typeRelay: { 1261 1262 // decapsulate message 1263 RelayMessage* relayMsg = overlayMsg->decapsulate<RelayMessage>(); 1264 1265 // is relay message informative? 1266 switch (relayMsg->getType()) { 1267 1268 // handle relay notification 1269 case RelayMessage::typeInform: { 1270 logging_info("Received relay information message with" 1271 << " relay " << relayMsg->getRelayNode() 1272 << " destination " << relayMsg->getDestNode() ); 1273 1274 // mark incoming link as relay 1275 if (ld!=NULL) ld->markAsRelay(); 1276 1277 // am I the destination of this message? yes-> 1278 if (relayMsg->getDestNode() == nodeId ) { 1279 // deliver relay message locally! 1280 logging_debug("Relay message reached destination. Handling the message."); 1281 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode ); 1282 return true; 1283 } 1284 1285 // create route message 1286 OverlayMsg _overMsg( *overlayMsg ); 1287 RelayMessage _relayMsg( *relayMsg ); 1288 _relayMsg.setType( RelayMessage::typeRoute ); 1289 _overMsg.encapsulate( &_relayMsg ); 1290 1291 // forward message 1292 if (relayMsg->getRelayNode() == nodeId || relayMsg->getRelayNode().isUnspecified()) { 1293 logging_info("Routing relay message to " << relayMsg->getDestNode().toString() ); 1294 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg ); 1295 } else { 1296 logging_info("Routing relay message to " << relayMsg->getRelayNode().toString() ); 1297 overlayInterface->routeMessage(relayMsg->getRelayNode(), &_overMsg ); 1298 } 1299 return true; 1300 } 1301 1302 // handle relay routing 1303 case RelayMessage::typeRoute: { 1304 logging_info("Received relay route message with" 1305 << " relay " << relayMsg->getRelayNode() 1306 << " destination " << relayMsg->getDestNode() ); 1307 1308 // mark incoming link as relay 1309 if (ld!=NULL) ld->markAsRelay(); 1310 1311 // am I the destination of this message? yes-> 1312 if (relayMsg->getDestNode() == nodeId ) { 1313 // deliver relay message locally! 1314 logging_debug("Relay message reached destination. Handling the message."); 1315 handleMessage( relayMsg, relayMsg->getDestLink(), LinkID::UNSPECIFIED, remoteNode ); 1316 return true; 1317 } 1318 1319 // am I the relay for this message? yes-> 1320 if (relayMsg->getRelayNode() == nodeId ) { 1321 logging_debug("I'm the relay for this message. Sending to destination."); 1322 OverlayMsg _overMsg( *overlayMsg ); 1323 RelayMessage _relayMsg( *relayMsg ); 1324 _overMsg.encapsulate(&_relayMsg); 1325 1326 /// this must be handled by using relay link! 1327 overlayInterface->routeMessage(relayMsg->getDestNode(), &_overMsg ); 1328 return true; 1329 } 1330 1331 // error: I'm not a relay or destination! 1332 logging_error("This node is neither relay nor destination. Dropping Message!"); 1333 return true; 1334 } 1335 default: { 1336 logging_error("RelayMessage Unknown!"); 1337 return true; 1338 } 1339 } 1340 1341 break; 1342 } 1343 1344 // --------------------------------------------------------------------- 1345 // handle keep-alive messages 1346 // --------------------------------------------------------------------- 1347 case OverlayMsg::typeKeepAlive: { 1348 if ( ld != NULL ) { 1349 //logging_force("Keep-Alive for "<< ld->overlayId); 1350 ld->markAlive(); 1351 } 1352 break; 1353 } 1354 1355 // --------------------------------------------------------------------- 1356 // handle direct link replacement messages 1357 // --------------------------------------------------------------------- 1358 case OverlayMsg::typeDirectLink: { 1359 LinkDescriptor* rld = getDescriptor( overlayMsg->getRelayLink() ); 1360 rld->communicationId = ld->communicationId; 1361 rld->relay = false; 1362 rld->localRelay = NodeID::UNSPECIFIED; 1363 rld->remoteRelay = NodeID::UNSPECIFIED; 1364 eraseDescriptor(ld->overlayId); 1365 break; 1366 } 1367 1368 // --------------------------------------------------------------------- 1369 // handle unknown message type 1370 // --------------------------------------------------------------------- 1371 default: { 1372 logging_error( "received message in invalid state! don't know " << 1373 "what to do with this message of type " << 1374 overlayMsg->getType() ); 1375 return false; 1376 } 1377 1378 } /* switch */ 1020 1379 return false; 1021 1380 } 1022 1381 1023 void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service){ 1382 // ---------------------------------------------------------------------------- 1383 1384 void BaseOverlay::broadcastMessage(Message* message, const ServiceID& service) { 1024 1385 1025 1386 logging_debug( "broadcasting message to all known nodes " << … … 1027 1388 1028 1389 OverlayInterface::NodeList nodes = overlayInterface->getKnownNodes(); 1029 1030 1390 OverlayInterface::NodeList::iterator i = nodes.begin(); 1031 OverlayInterface::NodeList::iterator iend = nodes.end(); 1032 1033 for( ; i != iend; i++ ){ 1391 for(; i != nodes.end(); i++ ) { 1034 1392 if( *i == nodeId) continue; // don't send to ourselfs 1035 1393 sendMessage( message, *i, service ); … … 1038 1396 1039 1397 vector<NodeID> BaseOverlay::getOverlayNeighbors() const { 1040 // the known nodes _can_ also include our 1041 // node, so we remove ourselfs 1042 1398 // the known nodes _can_ also include our node, so we remove ourself 1043 1399 vector<NodeID> nodes = overlayInterface->getKnownNodes(); 1044 1400 vector<NodeID>::iterator i = find( nodes.begin(), nodes.end(), this->nodeId ); 1045 1401 if( i != nodes.end() ) nodes.erase( i ); 1046 1047 1402 return nodes; 1048 1403 } 1049 1404 1050 void BaseOverlay::updateOvlVis( const NodeID& n ) {1051 NodeID node = n;1052 /* void visShowNodeBubble (1053 NETWORK_ID network,1054 NodeID& node,1055 string label1056 );1057 */1058 using namespace std;1059 1060 if (node == nodeId || node.isUnspecified()) return;1061 1062 // min/max1063 if ( node < min || min.isUnspecified() ) min = node;1064 if ( node > max || max.isUnspecified() ) max = node;1065 1066 // successor1067 if ( succ.isUnspecified() || (node > nodeId && (succ < nodeId || (node-nodeId) < (succ-nodeId))) ) {1068 if (!succ.isUnspecified() && node != succ)1069 ovl.visDisconnect(ovlId, nodeId, succ, string(""));1070 succ = node;1071 ovl.visConnect(ovlId, nodeId, succ, string(""));1072 }1073 1074 // set successor (circle-wrap)1075 if (succ.isUnspecified() && !min.isUnspecified()) {1076 succ = min;1077 ovl.visConnect(ovlId, nodeId, succ, string(""));1078 }1079 }1080 1081 1405 const NodeID& BaseOverlay::getNodeID(const LinkID& lid) const { 1082 1083 1406 if( lid == LinkID::UNSPECIFIED ) return nodeId; 1084 1085 LinkMapping::const_iterator i = linkMapping.find( lid ); 1086 if( i == linkMapping.end() ) return NodeID::UNSPECIFIED; 1087 else return i->second.node; 1407 const LinkDescriptor* ld = getDescriptor(lid); 1408 if( ld == NULL ) return NodeID::UNSPECIFIED; 1409 else return ld->remoteNode; 1088 1410 } 1089 1411 1090 1412 vector<LinkID> BaseOverlay::getLinkIDs( const NodeID& nid ) const { 1091 1092 1413 vector<LinkID> linkvector; 1093 1094 BOOST_FOREACH( LinkPair item, linkMapping ){ 1095 if( item.second.node == nid || nid == NodeID::UNSPECIFIED ){ 1096 linkvector.push_back( item.second.link ); 1097 } 1098 } 1099 1414 BOOST_FOREACH( LinkDescriptor* ld, links ) { 1415 if( ld->remoteNode == nid || nid == NodeID::UNSPECIFIED ) { 1416 linkvector.push_back( ld->overlayId ); 1417 } 1418 } 1100 1419 return linkvector; 1101 1420 } 1102 1421 1103 void BaseOverlay::incomingRouteMessage(Message* msg){ 1104 // gets handled as normal data message 1105 receiveMessage( msg, LinkID::UNSPECIFIED, NodeID::UNSPECIFIED ); 1106 } 1107 1108 void BaseOverlay::onNodeJoin(const NodeID& node){ 1109 1422 1423 void BaseOverlay::onNodeJoin(const NodeID& node) { 1110 1424 JoiningNodes::iterator i = std::find( joiningNodes.begin(), joiningNodes.end(), node ); 1111 1425 if( i == joiningNodes.end() ) return; 1112 1426 1113 1427 logging_info( "node has successfully joined baseoverlay and overlay structure " 1114 1428 << node.toString() ); 1115 1429 1116 1430 joiningNodes.erase( i ); 1117 1431 } 1118 1432 1119 void BaseOverlay::eventFunction(){ 1120 1121 list<LinkID> oldlinks; 1433 void BaseOverlay::eventFunction() { 1434 1435 // send keep-alive messages over established links 1436 BOOST_FOREACH( LinkDescriptor* ld, links ) { 1437 if (!ld->up) continue; 1438 OverlayMsg overMsg( OverlayMsg::typeKeepAlive, 1439 OverlayInterface::OVERLAY_SERVICE_ID, nodeId ); 1440 sendMessage( &overMsg, ld ); 1441 } 1442 1443 // iterate over all links and check for time boundaries 1444 vector<LinkDescriptor*> oldlinks; 1122 1445 time_t now = time(NULL); 1123 1124 // first gather all the links from linkMapping that need droppin 1125 // don't directly drop, as the dropLink function affects the 1126 // linkMapping structure that we are traversing here. 1127 // drop links after a timeout of 30s 1128 1129 BOOST_FOREACH( LinkPair item, linkMapping ){ 1130 if( item.second.autolink && difftime(now, item.second.lastuse) > 30) 1131 oldlinks.push_back( item.first ); 1132 } 1133 1134 BOOST_FOREACH( const LinkID lnk, oldlinks ) { 1135 logging_debug( "auto-link " << lnk.toString() << " timed out and is getting dropped" ); 1136 dropLink( lnk ); 1137 } 1446 BOOST_FOREACH( LinkDescriptor* ld, links ) { 1447 // remote used as relay flag 1448 if ( ld->usedAsRelay && difftime( now, ld->timeUsedAsRelay ) > 10) 1449 ld->usedAsRelay = false; 1450 1451 // keep alives missed? yes-> 1452 if ( !ld->up && difftime( now, ld->keepAliveTime ) > 2 ) { 1453 1454 // increase counter 1455 ld->keepAliveMissed++; 1456 1457 // missed more than four keep-alive messages (4 sec)? -> drop link 1458 if (ld->keepAliveMissed > 10) { 1459 logging_force( "Link connection request is stale, closing: " << ld ); 1460 oldlinks.push_back( ld ); 1461 } 1462 } 1463 1464 if (!ld->up) continue; 1465 1466 // drop links that are dropped and not used as relay 1467 if (ld->dropWhenRelaysLeft && !ld->usedAsRelay && !ld->autolink) 1468 oldlinks.push_back( ld ); 1469 else 1470 1471 // auto-link time exceeded? 1472 if ( ld->autolink && difftime( now, ld->lastuse ) > 30 ) 1473 oldlinks.push_back( ld ); 1474 1475 else 1476 1477 // keep alives missed? yes-> 1478 if ( !ld->autolink && difftime( now, ld->keepAliveTime ) > 2 ) { 1479 1480 // increase counter 1481 ld->keepAliveMissed++; 1482 1483 // missed more than four keep-alive messages (4 sec)? -> drop link 1484 if (ld->keepAliveMissed >= 8) { 1485 logging_force( "Link is stale, closing: " << ld ); 1486 oldlinks.push_back( ld ); 1487 } 1488 } 1489 } 1490 1491 // show link state 1492 counter++; 1493 if (counter>=4) showLinkState(); 1494 if (counter>=4 || counter<0) counter = 0; 1495 1496 // drop links 1497 BOOST_FOREACH( const LinkDescriptor* ld, oldlinks ) { 1498 if (!ld->communicationId.isUnspecified() && ld->communicationId == initiatorLink) { 1499 logging_force( "Not dropping initiator link: " << ld ); 1500 continue; 1501 } 1502 logging_force( "Link timed out. Dropping " << ld ); 1503 dropLink( ld->overlayId ); 1504 } 1505 } 1506 1507 void BaseOverlay::showLinkState() { 1508 int i=0; 1509 logging_force("--- link state -------------------------------"); 1510 BOOST_FOREACH( LinkDescriptor* ld, links ) { 1511 logging_force("link " << i << ": " << ld); 1512 i++; 1513 } 1514 logging_force("----------------------------------------------"); 1138 1515 } 1139 1516 -
source/ariba/overlay/BaseOverlay.h
r4836 r5151 86 86 // ariba interface 87 87 using ariba::NodeListener; 88 using ariba::SideportListener; 88 89 using ariba::CommunicationListener; 89 90 … … 117 118 namespace overlay { 118 119 119 class BaseOverlay: public MessageReceiver, public CommunicationEvents, 120 public OverlayStructureEvents, protected Timer { 121 122 private: 120 class LinkDescriptor; 121 122 class BaseOverlay: public MessageReceiver, 123 public CommunicationEvents, 124 public OverlayStructureEvents, 125 protected Timer { 126 123 127 friend class OneHop; 124 128 friend class Chord; 129 friend class ariba::SideportListener; 125 130 126 131 use_logging_h( BaseOverlay ); … … 157 162 */ 158 163 const LinkID establishLink(const NodeID& node, const ServiceID& service, 159 164 const LinkID& linkid = LinkID::UNSPECIFIED); 160 165 161 166 /** 162 167 * Starts a link establishment procedure to the specified 163 * 168 * endpoint and to the specified service. Concurrently it tries to 169 * establish a relay link over the overlay using the nodeid 170 */ 171 const LinkID establishLink(const EndpointDescriptor& ep, const NodeID& nodeid, 172 const ServiceID& service, const LinkID& linkid = LinkID::UNSPECIFIED); 173 174 /** 175 * Starts a link establishment procedure to the specified 164 176 * endpoint and to the specified service 165 177 */ 166 178 const LinkID establishLink(const EndpointDescriptor& ep, 167 const ServiceID& service, const LinkID& linkid = 168 LinkID::UNSPECIFIED); 179 const ServiceID& service, const LinkID& linkid = LinkID::UNSPECIFIED); 169 180 170 181 /// drops a link … … 176 187 /// sends a message to a node and a specific service 177 188 seqnum_t sendMessage(const Message* message, const NodeID& node, 178 189 const ServiceID& service); 179 190 180 191 /** … … 251 262 * @param boot A bootstrap node 252 263 */ 253 void joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& boot 264 void joinSpoVNet(const SpoVNetID& id, const EndpointDescriptor& boot); 254 265 255 266 /** … … 275 286 */ 276 287 virtual void onLinkUp(const LinkID& id, const NetworkLocator* local, 277 288 const NetworkLocator* remote); 278 289 279 290 /** … … 281 292 */ 282 293 virtual void onLinkDown(const LinkID& id, const NetworkLocator* local, 283 294 const NetworkLocator* remote); 284 295 285 296 /** … … 287 298 */ 288 299 virtual void onLinkChanged(const LinkID& id, 289 290 300 const NetworkLocator* oldlocal, const NetworkLocator* newlocal, 301 const NetworkLocator* oldremote, const NetworkLocator* newremote); 291 302 292 303 /** … … 294 305 */ 295 306 virtual void onLinkFail(const LinkID& id, const NetworkLocator* local, 296 307 const NetworkLocator* remote); 297 308 298 309 /** … … 300 311 */ 301 312 virtual void onLinkQoSChanged(const LinkID& id, 302 303 313 const NetworkLocator* local, const NetworkLocator* remote, 314 const QoSParameterSet& qos); 304 315 305 316 /** … … 307 318 */ 308 319 virtual bool onLinkRequest(const LinkID& id, const NetworkLocator* local, 309 const NetworkLocator* remote); 320 const NetworkLocator* remote); 321 322 /** 323 * Processes a received message from BaseCommunication 324 * 325 * In case of a message routed by the overlay the source identifies 326 * the node the message came from! 327 */ 328 virtual bool receiveMessage(const Message* message, const LinkID& link, 329 const NodeID& source = NodeID::UNSPECIFIED); 310 330 311 331 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 312 332 313 /** 314 * Processes a received message. 315 * 316 * Beware: nodeid is not valid in this case! (since this class implements 317 * nodeid's in the first place *g*) 318 */ 319 virtual bool receiveMessage( 320 const Message* message, const LinkID& link, const NodeID&); 333 /// handles an incoming message with link descriptor 334 bool handleMessage(const Message* message, 335 const LinkID& boLink, const LinkID& bcLink, const NodeID& remoteNode ); 321 336 322 337 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 323 324 338 /** 325 339 * This method is called, when a routed message arrives from the … … 328 342 * @see OverlayStructureEvents.h 329 343 */ 330 virtual void incomingRouteMessage(Message* msg); 344 virtual void incomingRouteMessage(Message* msg, 345 const LinkID& link = LinkID::UNSPECIFIED, 346 const NodeID& source = NodeID::UNSPECIFIED); 331 347 332 348 /** … … 343 359 344 360 private: 345 /** 346 * The BaseCommunication the BaseOverlay 347 * communicates over 348 */ 349 BaseCommunication* bc; 350 351 /** 352 * The nodeid of this BaseOverlay instance. 353 */ 354 NodeID nodeId; 355 356 /** 357 * The SpoVNetID that we are joined to 358 * or that we have created. 359 */ 360 SpoVNetID spovnetId; 361 362 /** 363 * TODO 364 */ 365 Demultiplexer<CommunicationListener*, ServiceID> communicationListeners; 366 367 /** 368 * TODO 369 */ 370 typedef vector<NodeListener*> NodeListenerVector; 371 372 /** 373 * TODO 374 */ 375 NodeListenerVector nodeListeners; 376 377 /** 378 * TODO 379 */ 380 SideportListener* sideport; 381 382 /** 383 * The abstract overlay interface that implements 384 * the overlay specific functionality. 385 */ 386 OverlayInterface* overlayInterface; 387 388 /** 389 * The special link to the Initiator of the SpoVNet 390 * or LinkID::UNDEFINED if we are the Initiator 391 */ 392 LinkID initiatorLink; 393 394 /** 395 * The state of the BaseOverlay 396 */ 361 362 /// The state of the BaseOverlay 397 363 typedef enum _BaseOverlayState { 398 364 BaseOverlayStateInvalid = 0, … … 402 368 } BaseOverlayState; 403 369 404 /** 405 * TODO 406 */ 407 BaseOverlayState state; 408 409 /** 410 * The initiator node 411 */ 412 NodeID spovnetInitiator; 413 414 /** 415 * OvlVis 416 */ 417 NodeID min, max; 418 NodeID succ, pred; 419 void updateOvlVis(const NodeID& node); 420 421 /** 422 * Link management 423 */ 424 class LinkItem { 425 public: 426 static const LinkItem UNSPECIFIED; 427 428 LinkItem() : 429 link(LinkID::UNSPECIFIED), 430 node(NodeID::UNSPECIFIED), 431 service(ServiceID::UNSPECIFIED), 432 interface(&CommunicationListener::DEFAULT), 433 autolink(false), 434 lastuse(0), 435 linkup(false){ 436 } 437 438 LinkItem(const LinkID& _link, const NodeID& _node, 439 const ServiceID& _service, CommunicationListener* _interface) : 440 link(_link), 441 node(_node), 442 service(_service), 443 interface(_interface), 444 autolink(false), 445 lastuse(time(NULL)), 446 linkup(false){ 447 448 assert( _interface != NULL ); 449 } 450 451 LinkItem(const LinkItem& rh) : 452 link(rh.link), 453 node(rh.node), 454 service(rh.service), 455 interface(rh.interface), 456 autolink(rh.autolink), 457 lastuse(rh.lastuse), 458 linkup(rh.linkup){ 459 460 BOOST_FOREACH( Message* msg, rh.waitingmsg ){ 461 waitingmsg.push_back( msg ); 462 } 463 } 464 465 void deleteWaiting(){ 466 BOOST_FOREACH( Message* msg, waitingmsg ){ 467 delete msg; 468 } 469 waitingmsg.clear(); 470 } 471 472 // general information about the link 473 const LinkID link; 474 NodeID node; 475 ServiceID service; 476 CommunicationListener* interface; 477 bool linkup; 478 479 // information needed for auto links 480 void markused() { 481 lastuse = time(NULL); 482 } 483 484 bool autolink; 485 time_t lastuse; 486 deque<Message*> waitingmsg; 487 }; 488 489 typedef map<const LinkID, LinkItem> LinkMapping; 490 typedef pair<const LinkID, LinkItem> LinkPair; 491 LinkMapping linkMapping; 370 BaseOverlayState state; ///< Current Base-Overlay state 371 BaseCommunication* bc; ///< reference to the base communication 372 NodeID nodeId; ///< the node id of this node 373 SpoVNetID spovnetId; ///< the spovnet id of the currently joined overlay 374 LinkID initiatorLink; ///< the link id of the link to the initiator node 375 NodeID spovnetInitiator;///< The initiator node 376 377 /// the service id communication listeners 378 Demultiplexer<CommunicationListener*, ServiceID> communicationListeners; 379 380 /// the node listeners 381 typedef vector<NodeListener*> NodeListenerVector; 382 NodeListenerVector nodeListeners; 383 384 /// the sideport listener 385 SideportListener* sideport; 386 387 /// the used overlay structure 388 OverlayInterface* overlayInterface; 389 390 /// The link mapping of the node 391 vector<LinkDescriptor*> links; 392 void eraseDescriptor(const LinkID& link, bool communication = false); 393 394 /// returns a link descriptor for the given id 395 LinkDescriptor* getDescriptor(const LinkID& link, 396 bool communication = false); 397 398 /// returns a link descriptor for the given id 399 const LinkDescriptor* getDescriptor(const LinkID& link, 400 bool communication = false) const; 401 402 /// returns a auto-link descriptor for the given node and service id 403 LinkDescriptor* getAutoDescriptor(const NodeID& node, const ServiceID& service); 404 405 /// adds a new link descriptor or uses an existing one 406 LinkDescriptor* addDescriptor(const LinkID& link = LinkID::UNSPECIFIED); 407 408 /// returns a direct link relay descriptor to the given relay node 409 LinkDescriptor* getRelayDescriptor( const NodeID& relayNode ); 410 411 /// find a proper relay node that is directly connected to this node 412 const NodeID findRelayNode( const NodeID& id ); 413 414 /// forwards a message over relays/overlay/directly using link descriptor 415 seqnum_t sendMessage( Message* message, const LinkDescriptor* ld ); 416 417 /// creates a link descriptor, applys relay semantics if possible 418 LinkDescriptor* createLinkDescriptor( 419 const NodeID& remoteNode, const ServiceID& service, const LinkID& link_id ); 492 420 493 421 // map of a link request map a nonce to a LinkID … … 495 423 PendingLinkMap pendingLinks; 496 424 425 void showLinkState(); 426 497 427 /** 498 428 * nodes with pending joines. TODO: should be cleaned every … … 502 432 JoiningNodes joiningNodes; 503 433 434 int counter; 435 504 436 /** 505 437 * Bootstrapper for our spovnet -
source/ariba/overlay/messages/JoinReply.h
r3690 r5151 85 85 86 86 sznBeginDefault( ariba::overlay::JoinReply, X ) { 87 X && &spovnetid && param && bootstrapEp && joinAllowed && cI(0,7); 87 uint8_t ja = joinAllowed; 88 X && &spovnetid && param && &bootstrapEp && ja; 89 if (X.isDeserializer()) joinAllowed = ja; 88 90 } sznEnd(); 89 91 -
source/ariba/overlay/messages/LinkRequest.cpp
r3690 r5151 7 7 vsznDefault(LinkRequest); 8 8 9 LinkRequest::LinkRequest() {10 11 }12 13 LinkRequest::LinkRequest( uint32_t nonce, const EndpointDescriptor* endpoint, bool reply ) :14 flags(reply&1), nonce(nonce), endpoint(endpoint) {15 }16 17 9 LinkRequest::~LinkRequest() { 18 10 } -
source/ariba/overlay/messages/LinkRequest.h
r4625 r5151 27 27 uint32_t nonce; 28 28 const EndpointDescriptor* endpoint; 29 LinkID remoteLinkId; 30 NodeID relay; 29 31 30 32 public: 31 LinkRequest(); 33 LinkRequest() { 34 35 } 32 36 33 37 LinkRequest( uint32_t nonce, const EndpointDescriptor* endpoint, 34 bool reply = false ); 38 bool reply = false, const LinkID& remoteLinkId = LinkID::UNSPECIFIED, 39 const NodeID& relay = NodeID::UNSPECIFIED ) : 40 flags(reply&1), nonce(nonce), endpoint(endpoint), remoteLinkId(remoteLinkId), relay(relay) { 41 } 35 42 36 43 virtual ~LinkRequest(); … … 38 45 const EndpointDescriptor* getEndpoint() const { 39 46 return endpoint; 47 } 48 49 const LinkID& getRemoteLinkId() const { 50 return remoteLinkId; 51 } 52 53 const NodeID& getRelay() const { 54 return relay; 40 55 } 41 56 … … 53 68 sznBeginDefault( ariba::overlay::LinkRequest, X ) { 54 69 if (X.isDeserializer()) endpoint = new EndpointDescriptor(); 55 X && flags && nonce && reinterpret_cast<VSerializeable*>(const_cast<EndpointDescriptor*>(endpoint)); 70 X && flags && nonce; 71 X && const_cast<EndpointDescriptor*>(endpoint); 72 X && &relay && &remoteLinkId; 56 73 } sznEnd(); 57 74 -
source/ariba/overlay/messages/OverlayMsg.cpp
r3690 r5151 44 44 vsznDefault(OverlayMsg); 45 45 46 OverlayMsg::OverlayMsg(OverlayMessageType _type, const ServiceID _service, const NodeID _sourceNode)47 : type( (uint8_t)_type), service( _service ), sourceNode( _sourceNode ) {48 }49 50 OverlayMsg::OverlayMsg(OverlayMessageType _type, const NodeID _sourceNode)51 : type( (uint8_t)_type), service( ServiceID::UNSPECIFIED ), sourceNode( _sourceNode ){52 }53 54 46 OverlayMsg::~OverlayMsg(){ 55 47 } 56 48 57 OverlayMsg::OverlayMessageType OverlayMsg::getType(){58 return (OverlayMessageType)type;59 }60 61 const ServiceID& OverlayMsg::getService(){62 return service;63 }64 65 bool OverlayMsg::isType(OverlayMessageType _type){66 return (OverlayMessageType)type == _type;67 }68 69 const NodeID& OverlayMsg::getSourceNode(){70 return sourceNode;71 }72 73 49 }} // ariba::overlay -
source/ariba/overlay/messages/OverlayMsg.h
r4625 r5151 55 55 namespace overlay { 56 56 57 using_serialization; 57 using_serialization 58 ; 58 59 59 class OverlayMsg : public Message { 60 VSERIALIZEABLE; 60 class OverlayMsg: public Message { 61 VSERIALIZEABLE 62 ; 61 63 public: 62 64 63 typedef enum _OverlayMessageType { 64 OverlayMessageTypeInvalid = 0, // invalid type (no encapsulated messages) 65 OverlayMessageTypeData = 1, // message contains data for higher layers 66 OverlayMessageTypeJoinRequest = 2, // spovnet join request 67 OverlayMessageTypeJoinReply = 3, // spovnet join reply 68 OverlayMessageTypeUpdate = 4, // update message for link association 69 OverlayMessageTypeBye = 5, // spovnet leave (no encapsulated messages) 70 OverlayMessageTypeLinkRequest = 6, // link request (sent over the overlay) 71 } OverlayMessageType; 65 /// (payload-) message types 66 enum type_ { 67 typeInvalid = 0, ///< invalid type (no encapsulated messages) 68 typeData = 1, ///< message contains data for higher layers 69 typeJoinRequest = 2, ///< join request 70 typeJoinReply = 3, ///< join reply 71 typeUpdate = 4, ///< update message for link association 72 typeBye = 5, ///< leave (no encapsulated messages) 73 typeLinkRequest = 6, ///< link request (sent over the overlay) 74 typeRelay = 7, ///< relay message 75 typeKeepAlive = 8, ///< a keep-alive message 76 typeDirectLink = 9, 77 ///< a direct connection has been established 78 }; 72 79 73 OverlayMsg( 74 OverlayMessageType _type = OverlayMessageTypeInvalid, 75 const ServiceID _service = ServiceID::UNSPECIFIED, 76 const NodeID _sourceNode = NodeID::UNSPECIFIED 77 ); 80 /// default constructor 81 OverlayMsg(type_ type = typeInvalid, const ServiceID _service = 82 ServiceID::UNSPECIFIED, const NodeID _sourceNode = 83 NodeID::UNSPECIFIED) : 84 type((uint8_t) type), service(_service), sourceNode(_sourceNode), 85 relayLink(LinkID::UNSPECIFIED), autoLink(false) { 86 } 78 87 79 OverlayMsg( 80 OverlayMessageType _type,81 const NodeID _sourceNode82 );88 OverlayMsg(const OverlayMsg& rhs) : 89 type(rhs.type), service(rhs.service), sourceNode(rhs.sourceNode), 90 relayLink(rhs.relayLink), autoLink(rhs.autoLink) { 91 } 83 92 84 virtual ~OverlayMsg(); 93 /// type and source node constructor 94 OverlayMsg(type_ type, const NodeID _sourceNode) : 95 type((uint8_t) type), service(ServiceID::UNSPECIFIED), sourceNode( 96 _sourceNode), relayLink(LinkID::UNSPECIFIED), autoLink(false) { 97 } 85 98 86 bool isType(OverlayMessageType _type); 87 OverlayMessageType getType(); 88 const ServiceID& getService(); 89 const NodeID& getSourceNode(); 99 /// destructor 100 ~OverlayMsg(); 90 101 102 type_ getType() const { 103 return (type_) type; 104 } 105 106 const ServiceID& getService() const { 107 return service; 108 } 109 110 const NodeID& getSourceNode() const { 111 return sourceNode; 112 } 113 114 const LinkID& getRelayLink() const { 115 return relayLink; 116 } 117 118 void setRelayLink(const LinkID& relayLink) { 119 this->relayLink = relayLink; 120 } 121 122 const bool isAutoLink() const { 123 return autoLink; 124 } 125 126 void setAutoLink(bool autoLink) { 127 this->autoLink = autoLink; 128 } 91 129 private: 92 130 uint8_t type; 93 131 ServiceID service; 94 132 NodeID sourceNode; 133 LinkID relayLink; 134 uint8_t autoLink; 95 135 }; 96 136 97 }} // ariba::overlay 137 } 138 } // ariba::overlay 98 139 99 sznBeginDefault( ariba::overlay::OverlayMsg, X ) { 100 X && type && &service && &sourceNode && Payload(); 101 } sznEnd(); 140 sznBeginDefault( ariba::overlay::OverlayMsg, X ){ 141 X && type && &service && &sourceNode; 142 if (type == typeDirectLink) X && &relayLink; 143 if (type == typeUpdate) X && autoLink; 144 X && Payload(); 145 }sznEnd(); 102 146 103 147 #endif // OVERLAY_MSG_H__ -
source/ariba/overlay/modules/OverlayInterface.h
r3718 r5151 128 128 129 129 /** 130 * Routes a message to a given node by using an existing link. 131 * 132 * TODO: This is a hack. This method allows the BaseOverlay class to 133 * use overlay signaling links to transfer data for relaying 134 * 135 * @param node The destination node. 136 * @param link An established link 137 * @param msg The message to be sent. 138 */ 139 virtual void routeMessage(const NodeID& node, const LinkID& link, Message* msg) = 0; 140 141 /** 130 142 * Returns the nodes known to this overlay. 131 143 * … … 139 151 virtual NodeList getKnownNodes() const = 0; 140 152 153 /** 154 * Returns the link id of the next hop a route message would take. 155 * 156 * @param id The destination node id 157 * @return The link id of the next hop 158 */ 159 virtual const LinkID& getNextLinkId( const NodeID& id ) const = 0; 160 141 161 //--- functions from CommunicationListener that we _can_ use as overlay --- 142 162 143 163 /// @see CommunicationListener 144 164 virtual void onLinkUp(const LinkID& lnk, const NodeID& remote); 165 145 166 /// @see CommunicationListener 146 167 virtual void onLinkDown(const LinkID& lnk, const NodeID& remote); 168 147 169 /// @see CommunicationListener 148 170 virtual void onLinkChanged(const LinkID& lnk, const NodeID& remote); 171 149 172 /// @see CommunicationListener 150 173 virtual void onLinkFail(const LinkID& lnk, const NodeID& remote); 174 151 175 /// @see CommunicationListener 152 176 virtual void onLinkQoSChanged(const LinkID& lnk, const NodeID& remote, 153 177 const LinkProperties& prop); 178 154 179 /// @see CommunicationListener 155 180 virtual bool onLinkRequest(const NodeID& remote, const DataMessage& msg); 181 156 182 /// @see CommunicationListener 157 183 virtual void onMessage(const DataMessage& msg, const NodeID& remote, … … 162 188 163 189 protected: 164 165 190 /// Reference to an active base overlay 166 191 BaseOverlay& baseoverlay; -
source/ariba/overlay/modules/OverlayStructureEvents.cpp
r3690 r5151 47 47 } 48 48 49 void OverlayStructureEvents::incomingRouteMessage(Message* msg ){49 void OverlayStructureEvents::incomingRouteMessage(Message* msg, const LinkID& link, const NodeID& source ){ 50 50 } 51 51 -
source/ariba/overlay/modules/OverlayStructureEvents.h
r3690 r5151 61 61 62 62 protected: 63 virtual void incomingRouteMessage( Message* msg );63 virtual void incomingRouteMessage( Message* msg, const LinkID& link = LinkID::UNSPECIFIED, const NodeID& source = NodeID::UNSPECIFIED ); 64 64 virtual void onNodeJoin( const NodeID& node ); 65 65 }; -
source/ariba/overlay/modules/chord/Chord.cpp
r3718 r5151 71 71 72 72 /// helper: sets up a link using the base overlay 73 LinkID Chord::setup(const EndpointDescriptor& endp ) {73 LinkID Chord::setup(const EndpointDescriptor& endp, const NodeID& node) { 74 74 75 75 logging_debug("request to setup link to " << endp.toString() ); 76 77 for (size_t i=0; i<pending.size(); i++) 78 if (pending[i]==node) return LinkID::UNSPECIFIED; 79 pending.push_back(node); 80 76 81 // establish link via base overlay 77 return baseoverlay.establishLink(endp, OverlayInterface::OVERLAY_SERVICE_ID);82 return baseoverlay.establishLink(endp, node, OverlayInterface::OVERLAY_SERVICE_ID); 78 83 } 79 84 … … 86 91 /// sends a discovery message 87 92 void Chord::send_discovery_to(const NodeID& destination, int ttl) { 88 logging_debug("Initiating discovery of " << destination.toString() );93 // logging_debug("Initiating discovery of " << destination.toString() ); 89 94 Message msg; 90 95 ChordMessage cmsg(ChordMessage::discovery, nodeid, destination); … … 111 116 112 117 // initiator? no->setup first link 113 if (!(boot == EndpointDescriptor::UNSPECIFIED)) bootstrapLink = setup(boot); 118 if (!(boot == EndpointDescriptor::UNSPECIFIED)) 119 bootstrapLink = setup(boot); 114 120 115 121 // timer for stabilization management … … 138 144 139 145 // message for this node? yes-> delegate to base overlay 140 if (item->id == nodeid) baseoverlay.incomingRouteMessage(msg); 146 if (item->id == nodeid || destnode == nodeid) 147 baseoverlay.incomingRouteMessage( msg, LinkID::UNSPECIFIED, nodeid ); 148 141 149 else { // no-> send to next hop 142 150 ChordMessage cmsg(ChordMessage::route, nodeid, destnode); … … 144 152 send(&cmsg, item->info); 145 153 } 154 } 155 156 /// @see OverlayInterface.h 157 void Chord::routeMessage(const NodeID& node, const LinkID& link, Message* msg) { 158 logging_debug("Redirect over Chord to node id=" << node.toString() 159 << " link id=" << link.toString() ); 160 ChordMessage cmsg(ChordMessage::route, nodeid, node); 161 cmsg.encapsulate(msg); 162 send(&cmsg, link); 163 } 164 165 /// @see OverlayInterface.h 166 const LinkID& Chord::getNextLinkId( const NodeID& id ) const { 167 // get next hop 168 const route_item* item = table->get_next_hop(id); 169 170 // returns a unspecified id when this is itself 171 if (item == NULL || item->id == nodeid) 172 return LinkID::UNSPECIFIED; 173 174 /// return routing info 175 return item->info; 146 176 } 147 177 … … 160 190 logging_debug("link_up: link=" << lnk.toString() << " remote=" << 161 191 remote.toString() ); 192 for (vector<NodeID>::iterator i=pending.begin(); i!=pending.end(); i++) 193 if (*i == remote) { 194 pending.erase(i); 195 break; 196 } 162 197 route_item* item = table->insert(remote); 163 198 … … 207 242 break; 208 243 209 244 // route message with payload 210 245 case M::route: { 211 246 // find next hop … … 214 249 // next hop == myself? 215 250 if (m->getDestination() == nodeid) { // yes-> route to base overlay 216 logging_debug(" send message to baseoverlay");217 baseoverlay.incomingRouteMessage( m);251 logging_debug("Send message to baseoverlay"); 252 baseoverlay.incomingRouteMessage( m, item->info, remote ); 218 253 } 219 254 // no-> route to next hop 220 255 else { 221 logging_debug("route chord message to " << item->id.toString() ); 256 logging_debug("Route chord message to " 257 << item->id.toString() << " (destination=" << m->getDestination() << ")"); 222 258 send(m, item->info); 223 259 } … … 236 272 237 273 // check if source node can be added to routing table and setup link 238 if (m->getSource() != nodeid && table->is_insertable(m->getSource())) setup(239 *dmsg->getSourceEndpoint());274 if (m->getSource() != nodeid && table->is_insertable(m->getSource())) 275 setup(*dmsg->getSourceEndpoint(), m->getSource() ); 240 276 241 277 // delegate discovery message … … 302 338 } 303 339 if (item == NULL) break; 304 logging_debug("routing discovery message to succ/pred "340 logging_debug("routing discovery message to succ/pred " 305 341 << item->id.toString() ); 306 342 ChordMessage cmsg(*m); … … 327 363 328 364 void Chord::eventFunction() { 329 if (!LinkID::UNSPECIFIED.isUnspecified())330 logging_error("LinkID::UNSPECIFIED not unspecified!!!!");331 365 stabilize_counter++; 332 366 if (stabilize_counter == 3) { 367 pending.clear(); 333 368 size_t numNeighbors = 0; 334 369 for (size_t i = 0; i < table->size(); i++) { … … 368 403 } 369 404 370 } 371 } // namespace ariba, overlay 405 }} // namespace ariba, overlay -
source/ariba/overlay/modules/chord/Chord.h
r3718 r5151 77 77 int stabilize_finger; 78 78 LinkID bootstrapLink; 79 vector<NodeID> pending; 79 80 80 81 // helper: sets up a link using the "base overlay" 81 LinkID setup( const EndpointDescriptor& endp );82 LinkID setup( const EndpointDescriptor& endp, const NodeID& node = NodeID::UNSPECIFIED ); 82 83 83 84 // helper: sends a message using the "base overlay" … … 91 92 OverlayStructureEvents* _eventsReceiver, const OverlayParameterSet& param); 92 93 virtual ~Chord(); 94 95 /// @see OverlayInterface.h 96 virtual const LinkID& getNextLinkId( const NodeID& id ) const; 93 97 94 98 /// @see OverlayInterface.h … … 113 117 114 118 /// @see OverlayInterface.h 119 virtual void routeMessage(const NodeID& node, const LinkID& link, Message* msg); 120 121 /// @see OverlayInterface.h 115 122 virtual NodeList getKnownNodes() const; 116 123 -
source/ariba/overlay/modules/onehop/OneHop.cpp
r3718 r5151 99 99 } 100 100 101 void OneHop::routeMessage(const NodeID& node, const LinkID& link, Message* msg) { 102 OneHopMessage onehopRoute( OneHopMessage::OneHopMessageTypeRoute ); 103 onehopRoute.encapsulate(msg); 104 baseoverlay.sendMessage( &onehopRoute, link ); 105 } 106 107 /// @see OverlayInterface.h 108 const LinkID& OneHop::getNextLinkId( const NodeID& id ) const { 109 OverlayNodeMapping::const_iterator i = overlayNodes.find( id ); 110 if (i == overlayNodes.end()) return LinkID::UNSPECIFIED; 111 return i->second; 112 } 113 101 114 void OneHop::createOverlay() { 102 115 // don't need to bootstrap against ourselfs. -
source/ariba/overlay/modules/onehop/OneHop.h
r3718 r5151 81 81 82 82 /// @see OverlayInterface.h 83 virtual const LinkID& getNextLinkId( const NodeID& id ) const; 84 85 /// @see OverlayInterface.h 83 86 virtual void routeMessage(const NodeID& destnode, Message* msg); 87 88 /// @see OverlayInterface.h 89 virtual void routeMessage(const NodeID& node, const LinkID& link, Message* msg); 84 90 85 91 /// @see OverlayInterface.h -
source/ariba/utility/logging/Logging.h
r3690 r5151 69 69 log4cxx::LoggerPtr x::logger(log4cxx::Logger::getLogger(#x)); 70 70 71 #define logging_trace(x) { LOG4CXX_TRACE(logger,x); } 72 #define logging_debug(x) {colorDebug; LOG4CXX_DEBUG(logger,x); colorDefault; } 73 #define logging_info(x) {colorInfo; LOG4CXX_INFO(logger,x); colorDefault; } 74 #define logging_warn(x) {colorWarn; LOG4CXX_WARN(logger,x); colorDefault; } 75 #define logging_error(x) {colorError; LOG4CXX_ERROR(logger,x); colorDefault; } 76 #define logging_fatal(x) {colorError; LOG4CXX_FATAL(logger,x); colorDefault; exit(-1); } 71 #ifdef HAVE_MAEMO 72 73 #define logging_trace(x) { } 74 #define logging_debug(x) { } 75 #define logging_info(x) { } 76 #define logging_warn(x) {colorWarn; LOG4CXX_WARN(logger,x); colorDefault; } 77 #define logging_error(x) {colorError; LOG4CXX_ERROR(logger,x); colorDefault; } 78 #define logging_fatal(x) {colorError; LOG4CXX_FATAL(logger,x); colorDefault; exit(-1); } 79 80 #else 81 82 #define logging_trace(x) { LOG4CXX_TRACE(logger,x); } 83 #define logging_debug(x) {colorDebug; LOG4CXX_DEBUG(logger,x); colorDefault; } 84 #define logging_info(x) {colorInfo; LOG4CXX_INFO(logger,x); colorDefault; } 85 #define logging_warn(x) {colorWarn; LOG4CXX_WARN(logger,x); colorDefault; } 86 #define logging_error(x) {colorError; LOG4CXX_ERROR(logger,x); colorDefault; } 87 #define logging_fatal(x) {colorError; LOG4CXX_FATAL(logger,x); colorDefault; exit(-1); } 88 89 #endif // HAVE_MAEMO 77 90 78 91 #endif //LOGGING_H__ -
source/ariba/utility/system/StartupWrapper.cpp
r4483 r5151 103 103 // set up again an individual level if you like 104 104 { 105 // log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger(" BaseOverlay"));105 // log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("PingPong")); 106 106 // logger->setLevel(log4cxx::Level::getDebug()); 107 107 }
Note:
See TracChangeset
for help on using the changeset viewer.