LOG.debug("Created topology node {} for id {} at {}", ret, id, ret.getNodeId());
this.state.put(id, ret);
}
- // FIXME: else check for conflicting session
-
+ // if another listener requests the same session, close it
+ final TopologySessionListener existingSessionListener = this.nodes.get(id);
+ if (existingSessionListener != null && !sessionListener.equals(existingSessionListener)) {
+ LOG.error("New session listener {} is in conflict with existing session listener {} on node {}, closing the existing one.", existingSessionListener, sessionListener, id);
+ existingSessionListener.close();
+ }
ret.taken(retrieveNode);
this.nodes.put(id, sessionListener);
LOG.debug("Node {} bound to listener {}", id, sessionListener);
assertTrue(this.receivedMsgs.get(this.receivedMsgs.size() - 1) instanceof Close);
}
+ @Test
+ public void testConflictingListeners() throws Exception {
+ this.listener.onSessionUp(this.session);
+ assertFalse(this.session.isClosed());
+ Stateful07TopologySessionListener conflictingListener = (Stateful07TopologySessionListener) getSessionListener();
+ conflictingListener.onSessionUp(this.session);
+ assertTrue(this.session.isClosed());
+ }
+
@Test
public void testOnSessionTermination() throws Exception {
this.listener.onSessionUp(this.session);