Revert "BUG-8156 Terminate PCEP session properly when ServerSessionManager is closed" 81/59681/4
authorClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Fri, 30 Jun 2017 09:51:10 +0000 (11:51 +0200)
committerClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Fri, 30 Jun 2017 09:53:13 +0000 (11:53 +0200)
This reverts commit 361265e3d13003bc3bfe75c888c40713ecbb36de
which introced a new bug.
bgpcep-csit-1node-periodic-throughpcep-*-carbon not passing

Change-Id: I965b09b36ef46f2a0ba4ea6b4861d66b7d0ddaec
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPSession.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPSessionListener.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/TerminationReason.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImplTest.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/SimpleSessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/SessionListenerState.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractPCEPSessionTest.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/Stateful07TopologySessionListenerTest.java

index 7aca1e51a23b185eadba19d0353ccfe497a9b441..656daaf25a8cd1ed42eee1175ffeac3c716989e8 100644 (file)
@@ -9,8 +9,6 @@ package org.opendaylight.protocol.pcep;
 
 import io.netty.util.concurrent.Future;
 import java.net.InetAddress;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
@@ -33,24 +31,7 @@ public interface PCEPSession extends AutoCloseable, PcepSessionState {
      */
     Future<Void> sendMessage(Message message);
 
-    /**
-     * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
-     * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
-     * inside the session or from the listener, therefore the parent of this session should be informed.
-     * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
-     *               When the reason provided is null, no CLOSE message will be sent.
-     */
-    void close(@Nullable TerminationReason reason);
-
-    /**
-     * Terminate a PCEP session.  A CLOSE message with given reason will be sent to remote peer by invoking
-     * {@link #close(TerminationReason)} method.
-     *
-     * It triggers {@link PCEPSessionListener#onSessionTerminated(PCEPSession, PCEPTerminationReason)} after closing.
-     * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
-     *               The reason cannot be null.
-     */
-    void terminate(@Nonnull TerminationReason reason);
+    void close(TerminationReason reason);
 
     Tlvs getRemoteTlvs();
 
index 032372396c05b267e8df98a956dc2448d3fb3674..8bca10edd35f170a767c19360b7993f2d3f54219 100644 (file)
@@ -31,8 +31,7 @@ public interface PCEPSessionListener extends EventListener {
     void onSessionDown(PCEPSession session, Exception e);
 
     /**
-     * Fired when the session is terminated locally or upon a CLOSE message from remote peer is received.
-     * The session has already been closed and transitioned to IDLE state.
+     * Fired when the session is terminated locally. The session has already been closed and transitioned to IDLE state.
      * Any outstanding queued messages were not sent. The user should not attempt to make any use of the session.
      *
      * @param reason the cause why the session went down
index 974a2621c873b9f71db080a3c97ecc0dd3b4517b..9eae09028c0c7ca0e667d20ce40ce3bc395e2559 100644 (file)
@@ -7,12 +7,23 @@
  */
 package org.opendaylight.protocol.pcep;
 
+import com.google.common.collect.Maps;
+import java.util.Map;
+
 public enum TerminationReason {
     UNKNOWN((short) 1), EXP_DEADTIMER((short) 2), MALFORMED_MSG((short) 3), TOO_MANY_UNKNWN_REQS((short) 4), TOO_MANY_UNKNOWN_MSGS((short) 5);
 
-    private final short value;
+    private short value;
+    private static final Map<Short, TerminationReason> VALUE_MAP;
+
+    static {
+        VALUE_MAP = Maps.newHashMap();
+        for (final TerminationReason enumItem : TerminationReason.values()) {
+            VALUE_MAP.put(enumItem.value, enumItem);
+        }
+    }
 
-    TerminationReason(final short value) {
+    private TerminationReason(final short value) {
         this.value = value;
     }
 
@@ -32,18 +43,6 @@ public enum TerminationReason {
      * @return corresponding TerminationReason item
      */
     public static TerminationReason forValue(final short valueArg) {
-        for (final TerminationReason reason : values()) {
-            if (reason.value == valueArg) {
-                return reason;
-            }
-        }
-        return null;
-    }
-
-    public static TerminationReason forValue(final Short valueArg) {
-        if (valueArg == null) {
-            return null;
-        }
-        return forValue(valueArg.shortValue());
+        return VALUE_MAP.get(valueArg);
     }
 }
index 6d9c1ea523bab399c3ef11606e1ace92c271100a..04152c312257442f71c946b26ef88cd74cdb5c42 100644 (file)
@@ -25,7 +25,6 @@ import java.util.Date;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
 import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
@@ -43,7 +42,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
@@ -90,7 +88,6 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     private int maxUnknownMessages;
 
     // True if the listener should not be notified about events
-    @GuardedBy("this")
     private boolean closed = false;
 
     private final Channel channel;
@@ -160,7 +157,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
      */
-    private void handleKeepaliveTimer() {
+    private  void handleKeepaliveTimer() {
         final long ct = TICKER.read();
 
         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
@@ -181,7 +178,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param msg to be sent
      */
     @Override
-    public synchronized Future<Void> sendMessage(final Message msg) {
+    public Future<Void> sendMessage(final Message msg) {
         final ChannelFuture f = this.channel.writeAndFlush(msg);
         this.lastMessageSentAt = TICKER.read();
         this.sessionState.updateLastSentMsg();
@@ -205,38 +202,31 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @VisibleForTesting
     ChannelFuture closeChannel() {
-        LOG.info("Closing PCEP session channel: {}", this.channel);
+        LOG.info("Closing PCEP session: {}", this);
         return this.channel.close();
     }
 
-    @VisibleForTesting
-    public synchronized boolean isClosed() {
-        return this.closed;
-    }
-
     /**
      * Closes PCEP session without sending a Close message, as the channel is no longer active.
      */
     @Override
-    public synchronized void close() {
-        close(null);
+    public void close() {
+        LOG.info("Closing PCEP session: {}", this);
+        closeChannel();
     }
 
+    /**
+     * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
+     * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
+     * inside the session or from the listener, therefore the parent of this session should be informed.
+     */
     @Override
     public synchronized void close(final TerminationReason reason) {
-        if (isClosed()) {
-            return;
-        }
+        LOG.info("Closing PCEP session: {}", this);
         this.closed = true;
-        // only send close message when the reason is provided
-        if (reason != null) {
-            LOG.info("Closing PCEP session with reason {}: {}", reason, this);
-            sendMessage(new CloseBuilder().setCCloseMessage(
-                    new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
-        } else {
-            LOG.info("Closing PCEP session: {}", this);
-        }
-        closeChannel();
+        this.sendMessage(new CloseBuilder().setCCloseMessage(
+            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+        this.close();
     }
 
     @Override
@@ -249,18 +239,18 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
     }
 
-    @Override
-    public synchronized void terminate(final TerminationReason reason) {
-        if (isClosed()) {
-            return;
-        }
-        close(reason);
+    private synchronized void terminate(final TerminationReason reason) {
+        LOG.info("Local PCEP session termination : {}", reason);
         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
+        this.closed = true;
+        this.sendMessage(new CloseBuilder().setCCloseMessage(
+            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+        this.close();
     }
 
     public synchronized void endOfInput() {
-        if (!isClosed()) {
-            this.listener.onSessionDown(this, new IOException("End of input detected. Close the session " + this));
+        if (!this.closed) {
+            this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
             this.closed = true;
         }
     }
@@ -326,16 +316,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
              * exception is CLOSE message, which needs to be converted into a
              * session DOWN event.
              */
-            final CCloseMessage closeMsg = ((CloseMessage) msg).getCCloseMessage();
-            TerminationReason reason = null;
-            if (closeMsg != null && closeMsg.getCClose() != null) {
-                reason = TerminationReason.forValue(closeMsg.getCClose().getReason());
-            }
-            if (reason == null) {
-                reason = TerminationReason.UNKNOWN;
-            }
-            this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
-            close();
+            this.close();
         } else {
             // This message needs to be handled by the user
             if (msg instanceof PcerrMessage) {
index f6404d5e430009859c964b2c96d499adf2ffe990..8eccb4b8e48389108253dd24b1bcbf7ae44a7bbb 100644 (file)
@@ -55,12 +55,10 @@ public class PCEPSessionImplTest extends AbstractPCEPSessionTest {
         Assert.assertTrue(this.listener.messages.get(0) instanceof Pcreq);
         Assert.assertEquals(2, this.session.getMessages().getReceivedMsgCount().intValue());
 
-        Assert.assertTrue(this.listener.up);
         this.session.handleMessage(new CloseBuilder().build());
         Assert.assertEquals(3, this.session.getMessages().getReceivedMsgCount().intValue());
         Assert.assertEquals(1, this.listener.messages.size());
         Assert.assertTrue(this.channel.isActive());
-        Assert.assertFalse(this.listener.up);
         Mockito.verify(this.channel, Mockito.times(1)).close();
 
         this.session.resetStats();
index 4c3ba69252dba5f65107b8682231fd56d69e1740..b0d3cf7250d52f7ada00b926d7fac85a4a95ebd7 100644 (file)
@@ -55,6 +55,5 @@ public class SimpleSessionListener implements PCEPSessionListener {
     @Override
     public void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason cause) {
         LOG.debug("Session terminated. Cause : {}", cause.toString());
-        this.up = false;
     }
 }
index 2f0ee9cc1a9d2731b4bd7f62107f9a46d251bcf7..06b8910e6c90a866711a1211682ddeb234603434 100755 (executable)
@@ -23,7 +23,6 @@ import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
@@ -126,7 +125,6 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     private SyncOptimization syncOptimization;
     private boolean triggeredResyncInProcess;
 
-    @GuardedBy("this")
     private ListenerStateRuntimeRegistration registration;
     @GuardedBy("this")
     private final SessionListenerState listenerState;
@@ -152,13 +150,13 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         // takeNodeState(..) may fail when the server session manager is being restarted due to configuration change
         if (state == null) {
             LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", session);
-            session.terminate(TerminationReason.UNKNOWN);
+            this.onSessionDown(session, new RuntimeException("Unable to fetch topology node state for PCEP session with " + session.getRemoteAddress()));
             return;
         }
 
         if (this.session != null || this.nodeState != null) {
             LOG.error("PCEP session is already up. Closing session {}", session);
-            session.terminate(TerminationReason.UNKNOWN);
+            this.onSessionDown(session, new IllegalStateException("Session is already up with " + session.getRemoteAddress()));
             return;
         }
 
@@ -186,7 +184,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         register();
         if (this.registration == null) {
             LOG.error("PCEP session fails to register. Closing session {}", session);
-            session.terminate(TerminationReason.UNKNOWN);
+            this.onSessionDown(session, new RuntimeException("PCEP Session with " + session.getRemoteAddress() + " fails to register."));
             return;
         }
         this.listenerState.init(session);
@@ -217,10 +215,6 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     }
 
     protected void updatePccState(final PccSyncState pccSyncState) {
-        if (this.serverSessionManager.isClosed()) {
-            LOG.debug("Ignore PCC state update for {} as session manager has been closed.", this.session);
-            return;
-        }
         final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
         if (pccSyncState != PccSyncState.Synchronized) {
@@ -255,10 +249,34 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     private synchronized void tearDown(final PCEPSession session) {
         Preconditions.checkNotNull(session);
         this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
-        // Do not send CLOSE message here.
-        // * In #onSessionDown(..), that channel is already unavailable, thus we won't be able to send out the message.
-        // * In #OnSessionTerminated(..), a CLOSE message should have already been sent, no need to send again.
-        close(null);
+        this.nodeState = null;
+        this.session = null;
+        this.syncOptimization = null;
+        unregister();
+
+        // Clear all requests we know about
+        for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
+            final PCEPRequest r = e.getValue();
+            switch (r.getState()) {
+            case DONE:
+                // Done is done, nothing to do
+                LOG.trace("Request {} was done when session went down.", e.getKey());
+                break;
+            case UNACKED:
+                // Peer has not acked: results in failure
+                LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
+                r.done(OperationResults.NOACK);
+                break;
+            case UNSENT:
+                // Peer has not been sent to the peer: results in cancellation
+                LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
+                r.done(OperationResults.UNSENT);
+                break;
+            default:
+                break;
+            }
+        }
+        this.requests.clear();
     }
 
     @Override
@@ -269,15 +287,14 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
 
     @Override
     public final synchronized void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason reason) {
-        LOG.info("Session {} terminated with reason {}", session, reason);
+        LOG.info("Session {} terminated by peer with reason {}", session, reason);
         tearDown(session);
     }
 
     @Override
     public final synchronized void onMessage(final PCEPSession session, final Message message) {
-        if (this.serverSessionManager.isClosed()) {
-            // we cannot operate on the topology node when the topology is removed by ServerSessionManager
-            LOG.debug("Ignore message from {} as session manager has been closed.", session);
+        if (this.nodeState == null) {
+            LOG.warn("Topology node state is null. Unhandled message {} on session {}", message, session);
             return;
         }
         final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
@@ -306,58 +323,20 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     }
 
     @Override
-    public synchronized void close() {
-        close(TerminationReason.UNKNOWN);
-    }
-
-    /**
-     * Close this session listener. Reset all session related status
-     *
-     * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
-     *               When the reason provided is null, no CLOSE message will be sent.
-     */
-    private final synchronized void close(@Nullable final TerminationReason reason) {
+    public void close() {
         unregister();
         if (this.session != null) {
-            this.session.close(reason);
-        }
-        this.session = null;
-        this.nodeState = null;
-        this.syncOptimization = null;
-
-        // Clear all requests we know about
-        for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
-            final PCEPRequest r = e.getValue();
-            switch (r.getState()) {
-                case DONE:
-                    // Done is done, nothing to do
-                    LOG.trace("Request {} was done when session went down.", e.getKey());
-                    break;
-                case UNACKED:
-                    // Peer has not acked: results in failure
-                    LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
-                    r.done(OperationResults.NOACK);
-                    break;
-                case UNSENT:
-                    // Peer has not been sent to the peer: results in cancellation
-                    LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
-                    r.done(OperationResults.UNSENT);
-                    break;
-                default:
-                    break;
-            }
+            this.session.close(TerminationReason.UNKNOWN);
         }
-        this.requests.clear();
-        this.listenerState.destroy();
     }
 
     private final synchronized void unregister() {
         if (this.registration != null) {
             this.registration.close();
-            LOG.debug("PCEP session {} is unregistered successfully.", this.session);
+            LOG.trace("PCEP session {} is unregistered successfully.", this.session);
             this.registration = null;
         } else {
-            LOG.debug("PCEP session {} was not registered.", this.session);
+            LOG.trace("PCEP session {} was not registered.", this.session);
         }
     }
 
@@ -366,7 +345,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         final PCEPTopologyProviderRuntimeRegistration runtimeReg = this.serverSessionManager.getRuntimeRootRegistration();
         if (runtimeReg != null) {
             this.registration = runtimeReg.register(this);
-            LOG.debug("PCEP session {} is successfully registered.", this.session);
+            LOG.trace("PCEP session {} is successfully registered.", this.session);
         }
     }
 
index c5fdb06cb82196dfa135d8abff8811143874665a..228e33f9f8500b79408d8996dba340faca415a3a 100755 (executable)
@@ -68,7 +68,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
     private final InstanceIdentifier<Topology> topology;
     private final DataBroker broker;
     private final PCEPStatefulPeerProposal peerProposal;
-    private final AtomicBoolean isClosed = new AtomicBoolean(true);
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
     private final short rpcTimeout;
     private final AtomicReference<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = new AtomicReference<>();
 
@@ -100,7 +100,6 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
             @Override
             public void onSuccess(final Void result) {
                 LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue());
-                ServerSessionManager.this.isClosed.set(false);
             }
 
             @Override
@@ -115,19 +114,8 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
         return new NodeId("pcc://" + addr.getHostAddress());
     }
 
-    boolean isClosed() {
-        return this.isClosed.get();
-    }
-
     synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session, final boolean persistNode) {
-        final NodeId id = createNodeId(session.getRemoteAddress());
-        this.nodes.remove(id);
-        if (isClosed()) {
-            // Since the whole pcep topology is going to be removed by ServerSessionManager, we do not need to remove each single
-            // node separately. Besides, it could cause Optismic Lock on DataStore when operating on the same topology
-            LOG.trace("Server Session Manager is closed. No need to release topology node {}", id);
-            return;
-        }
+        this.nodes.remove(createNodeId(session.getRemoteAddress()));
         if (nodeState != null) {
             LOG.debug("Node {} unbound", nodeState.getNodeId());
             nodeState.released(persistNode);
@@ -136,7 +124,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
 
     synchronized TopologyNodeState takeNodeState(final InetAddress address, final TopologySessionListener sessionListener, final boolean retrieveNode) {
         final NodeId id = createNodeId(address);
-        if (isClosed()) {
+        if (this.isClosed.get()) {
             LOG.error("Server Session Manager is closed. Unable to create topology node {} with listener {}", id, sessionListener);
             return null;
         }
@@ -149,12 +137,8 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
             LOG.debug("Created topology node {} for id {} at {}", ret, id, ret.getNodeId());
             this.state.put(id, ret);
         }
+        // FIXME: else check for conflicting session
 
-        final TopologySessionListener conflictingSessionListener = this.nodes.get(id);
-        if (conflictingSessionListener != null && !sessionListener.equals(conflictingSessionListener)) {
-            LOG.error("Existing session {} is conflict with new session {} on node {}, closing the existing one.", conflictingSessionListener, sessionListener, id);
-            conflictingSessionListener.close();
-        }
         ret.taken(retrieveNode);
         this.nodes.put(id, sessionListener);
         LOG.debug("Node {} bound to listener {}", id, sessionListener);
@@ -206,7 +190,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
         return (l != null) ? l.triggerSync(input) : OperationResults.UNSENT.future();
     }
 
-    synchronized CheckedFuture<Void, TransactionCommitFailedException> closeServiceInstance() {
+    synchronized ListenableFuture<Void> closeServiceInstance() {
         if (this.isClosed.getAndSet(true)) {
             LOG.error("Session Manager has already been closed.");
             Futures.immediateFuture(null);
@@ -240,17 +224,16 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
         return future;
     }
 
-    PCEPTopologyProviderRuntimeRegistration getRuntimeRootRegistration() {
-        return this.runtimeRootRegistration.get();
-    }
-
     synchronized void setRuntimeRootRegistrator(final PCEPTopologyProviderRuntimeRegistrator runtimeRootRegistrator) {
         if (!this.runtimeRootRegistration.compareAndSet(null, runtimeRootRegistrator.register(this))) {
             LOG.error("Runtime root registration has been set before.");
         }
     }
 
-    @Override
+    PCEPTopologyProviderRuntimeRegistration getRuntimeRootRegistration() {
+        return this.runtimeRootRegistration.get();
+    }
+
     public void setPeerSpecificProposal(final InetSocketAddress address, final TlvsBuilder openBuilder) {
         Preconditions.checkNotNull(address);
         this.peerProposal.setPeerProposal(createNodeId(address.getAddress()), openBuilder);
index 6e3689cafb498699f5c270e0dbe8ec15c4239991..340cde0438a3a468665bb7c915bc45571410dcf0 100644 (file)
@@ -47,10 +47,6 @@ final class SessionListenerState {
         this.capa = new PeerCapabilities();
     }
 
-    /**
-     * Initialize the listenerState with a PCEP session
-     * @param session PCEP session that is bind to this listenerState
-     */
     public void init(final PCEPSession session) {
         Preconditions.checkNotNull(session);
         this.localPref = getLocalPref(session.getLocalPref());
@@ -58,25 +54,6 @@ final class SessionListenerState {
         this.sessionUpDuration.start();
     }
 
-    /**
-     * Reset this listenerState instance to it's initial state
-     * This isn't necessary in most case, as a PCEP session listener
-     * is tightly bind to a particular PCEP session
-     * However, it is required in unit test. Because we are reusing
-     * PCEP session listener in a single test, the listenerState
-     * will not be killed when the session is down. So if later
-     * we want to {@link #init(PCEPSession)} a session with the same session listener
-     * again, an exception saying "stopWatch cannot be started again"
-     * will be thrown, and all the statistical counter won't be correct
-     */
-    public void destroy() {
-        this.localPref = null;
-        this.peerPref = null;
-        this.sessionUpDuration.reset();
-        this.capa = new PeerCapabilities();
-        resetStats();
-    }
-
     public void processRequestStats(final long duration) {
         if (this.minReplyTime == 0) {
             this.minReplyTime = duration;
@@ -101,10 +78,8 @@ final class SessionListenerState {
         return msgs;
     }
 
-    /**
-     * Reset statistic counters
-     */
-    private void resetStats() {
+    public void resetStats(final PCEPSession session) {
+        Preconditions.checkNotNull(session);
         this.receivedRptMsgCount = 0;
         this.sentInitMsgCount = 0;
         this.sentUpdMsgCount = 0;
@@ -113,11 +88,6 @@ final class SessionListenerState {
         this.minReplyTime = 0;
         this.totalTime = 0;
         this.reqCount = 0;
-    }
-
-    public void resetStats(final PCEPSession session) {
-        Preconditions.checkNotNull(session);
-        resetStats();
         session.resetStats();
     }
 
index a35bcadf45d634b30ca23cc7bab82972fa777746..e564d59f3841accc90c4343d8d87d9629de9a598 100644 (file)
@@ -8,17 +8,12 @@
 
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
@@ -30,9 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.Mock;
@@ -45,9 +38,9 @@ import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopolo
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
 import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiator;
-import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.protocol.util.InetSocketAddressUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix;
@@ -110,9 +103,6 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
     @Mock
     ListenerStateRuntimeRegistration listenerReg;
 
-    @Mock
-    private PCEPTopologyProviderRuntimeRegistrator registrator;
-
     private final Open localPrefs = new OpenBuilder().setDeadTimer((short) 30).setKeepalive((short) 10)
         .setSessionId((short) 0).build();
 
@@ -120,8 +110,6 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
 
     ServerSessionManager manager;
 
-    private PCEPSessionListener sessionListener;
-
     NetworkTopologyPcepService topologyRpcs;
 
     private DefaultPCEPSessionNegotiator neg;
@@ -152,50 +140,25 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
         doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
 
         doNothing().when(this.listenerReg).close();
-        doReturn("listenerReg").when(this.listenerReg).toString();
         final PCEPTopologyProviderRuntimeRegistration topologyReg = mock(PCEPTopologyProviderRuntimeRegistration.class);
         doReturn(this.listenerReg).when(topologyReg).register(any(ListenerStateRuntimeMXBean.class));
         doNothing().when(topologyReg).close();
-        doReturn(topologyReg).when(this.registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
+        final PCEPTopologyProviderRuntimeRegistrator registrator = mock(PCEPTopologyProviderRuntimeRegistrator.class);
+        doReturn(topologyReg).when(registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
 
         final T listenerFactory = (T) ((Class) ((ParameterizedType) this.getClass().getGenericSuperclass())
             .getActualTypeArguments()[0]).newInstance();
         this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, listenerFactory, RPC_TIMEOUT);
-        startSessionManager();
-        this.sessionListener = this.manager.getSessionListener();
+        this.manager.setRuntimeRootRegistrator(registrator);
+        this.manager.instantiateServiceInstance().checkedGet();
         this.neg = new DefaultPCEPSessionNegotiator(mock(Promise.class), this.clientListener,
-            this.sessionListener, (short) 1, 5, this.localPrefs);
+            this.manager.getSessionListener(), (short) 1, 5, this.localPrefs);
         this.topologyRpcs = new TopologyRPCs(this.manager);
     }
 
     @After
     public void tearDown() throws TransactionCommitFailedException {
-        stopSessionManager();
-    }
-
-    protected void startSessionManager() throws TransactionCommitFailedException, InterruptedException {
-        this.manager.setRuntimeRootRegistrator(this.registrator);
-        final CheckedFuture<Void, TransactionCommitFailedException> future = this.manager.instantiateServiceInstance();
-        final CountDownLatch lock = new CountDownLatch(1);
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(@Nullable final Void aVoid) {
-                lock.countDown();
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-                // the test cannot continue
-                fail();
-            }
-        });
-        future.checkedGet();
-        lock.await(5000, TimeUnit.MILLISECONDS);
-        assertFalse(this.manager.isClosed());
-    }
-
-    protected void stopSessionManager() throws TransactionCommitFailedException {
-        this.manager.closeServiceInstance().checkedGet();
+        this.manager.closeServiceInstance();
     }
 
     Ero createEroWithIpPrefixes(final List<String> ipPrefixes) {
@@ -223,10 +186,10 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
     }
 
     protected PCEPSessionListener getSessionListener() {
-        return this.sessionListener;
+        return this.manager.getSessionListener();
     }
 
-    protected final PCEPSessionImpl getPCEPSession(final Open localOpen, final Open remoteOpen) {
+    protected final PCEPSession getPCEPSession(final Open localOpen, final Open remoteOpen) {
         return this.neg.createSession(this.clientListener, localOpen, remoteOpen);
     }
 }
index 5d590878cec6711568473a36e526f35143c0d5e4..3867a910f75473750543cd1276a6f159e5c68590 100755 (executable)
@@ -35,8 +35,9 @@ import org.junit.Test;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.protocol.pcep.PCEPCloseTermination;
+import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.TerminationReason;
-import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.protocol.pcep.pcc.mock.spi.MsgBuilderUtil;
 import org.opendaylight.protocol.pcep.spi.AbstractMessageParser;
 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
@@ -71,7 +72,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.iet
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.pcupd.message.pcupd.message.Updates;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.stateful.capability.tlv.StatefulBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.symbolic.path.name.tlv.SymbolicPathNameBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Close;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.Ipv4CaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.ipv4._case.Ipv4Builder;
@@ -105,7 +105,7 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
 
     private Stateful07TopologySessionListener listener;
 
-    private PCEPSessionImpl session;
+    private PCEPSession session;
 
     @Override
     @Before
@@ -304,10 +304,7 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
         verify(this.listenerReg, times(0)).close();
         // send request
         final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
-        assertFalse(this.session.isClosed());
-        // .onSessionDown() invokes tearDown(session), which invokes session.close(null)
         this.listener.onSessionDown(this.session, new IllegalArgumentException());
-        assertTrue(this.session.isClosed());
         verify(this.listenerReg, times(1)).close();
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
@@ -318,43 +315,40 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
      * All the pcep session registration should be closed when the session manager is closed
      * @throws InterruptedException
      * @throws ExecutionException
+     * @throws TransactionCommitFailedException
      */
     @Test
     public void testOnServerSessionManagerDown() throws InterruptedException, ExecutionException,
-            TransactionCommitFailedException {
+        TransactionCommitFailedException {
         this.listener.onSessionUp(this.session);
-        // the session should not be closed when session manager is up
-        assertFalse(this.session.isClosed());
         verify(this.listenerReg, times(0)).close();
         // send request
         final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
-        stopSessionManager();
+        this.manager.closeServiceInstance();
         verify(this.listenerReg, times(1)).close();
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
         assertEquals(FailureType.Unsent, output.getFailure());
-        // verify the session is closed after server session manager is closed
-        assertTrue(this.session.isClosed());
     }
 
     /**
      * Verify the PCEP session should not be up when server session manager is down,
      * otherwise it would be a problem when the session is up while it's not registered with session manager
+     * @throws InterruptedException
+     * @throws ExecutionException
+     * @throws TransactionCommitFailedException
      */
     @Test
     public void testOnServerSessionManagerUnstarted() throws InterruptedException, ExecutionException,
-            TransactionCommitFailedException, ReadFailedException {
-        stopSessionManager();
+        TransactionCommitFailedException, ReadFailedException {
+        this.manager.closeServiceInstance();
         // the registration should not be closed since it's never initialized
         verify(this.listenerReg, times(0)).close();
-        assertFalse(this.session.isClosed());
         this.listener.onSessionUp(this.session);
         // verify the session was NOT added to topology
         checkNotPresentOperational(getDataBroker(), TOPO_IID);
         // still, the session should not be registered and thus close() is never called
         verify(this.listenerReg, times(0)).close();
-        // verify the session is closed due to server session manager is closed
-        assertTrue(this.session.isClosed());
         // send request
         final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
         final AddLspOutput output = futureOutput.get().getResult();
@@ -383,99 +377,10 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
             return topology;
         });
 
-        assertFalse(this.session.isClosed());
-        this.session.terminate(TerminationReason.UNKNOWN);
-        assertTrue(this.session.isClosed());
-        verify(this.listenerReg, times(1)).close();
         // node should be removed after termination
-        checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
-        assertFalse(this.receivedMsgs.isEmpty());
-        // the last message should be a Close message
-        assertTrue(this.receivedMsgs.get(this.receivedMsgs.size() - 1) instanceof Close);
-    }
-
-    /**
-     * When a session is somehow duplicated in controller, the controller should drop existing session
-     */
-    @Test
-    public void testDuplicatedSession() throws ReadFailedException {
-        this.listener.onSessionUp(this.session);
-        verify(this.listenerReg, times(0)).close();
-
-        // create node
-        this.topologyRpcs.addLsp(createAddLspInput());
-        final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
-        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
-        final long srpId = req.getSrp().getOperationId().getValue();
-        final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
-            this.testAddress, this.testAddress, this.testAddress, Optional.absent());
-        final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
-                .setRemove(false).setOperational(OperationalStatus.Active).build(),
-            Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
-        this.listener.onMessage(this.session, pcRpt);
-        readDataOperational(getDataBroker(), TOPO_IID, topology -> {
-            assertEquals(1, topology.getNode().size());
-            return topology;
-        });
-
-        // now we do session up again
-        this.listener.onSessionUp(this.session);
-        assertTrue(this.session.isClosed());
+        this.listener.onSessionTerminated(this.session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
         verify(this.listenerReg, times(1)).close();
-        // node should be removed after termination
-        checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
-    }
-
-    @Test
-    public void testOnServerSessionManagerRestartAndSessionRecovery() throws Exception {
-        // close server session manager first
-        stopSessionManager();
-        // the registration should not be closed since it's never initialized
-        verify(this.listenerReg, times(0)).close();
-        assertFalse(this.session.isClosed());
-        this.listener.onSessionUp(this.session);
-        // verify the session was NOT added to topology
-        checkNotPresentOperational(getDataBroker(), TOPO_IID);
-        // still, the session should not be registered and thus close() is never called
-        verify(this.listenerReg, times(0)).close();
-        // verify the session is closed due to server session manager is closed
-        assertTrue(this.session.isClosed());
-        // send request
-        final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
-        final AddLspOutput output = futureOutput.get().getResult();
-        // deal with unsent request after session down
-        assertEquals(FailureType.Unsent, output.getFailure());
-        // PCC client is not there
         checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
-
-        // reset received message queue
-        this.receivedMsgs.clear();
-        // now we restart the session manager
-        startSessionManager();
-        // try to start the session again
-        // notice since the session was terminated before, it is not usable anymore.
-        // we need to get a new session instance. the new session will have the same local / remote preference
-        this.session = getPCEPSession(getLocalPref(), getRemotePref());
-        verify(this.listenerReg, times(0)).close();
-        assertFalse(this.session.isClosed());
-        this.listener.onSessionUp(this.session);
-        assertFalse(this.session.isClosed());
-
-        // create node
-        this.topologyRpcs.addLsp(createAddLspInput());
-        final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
-        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
-        final long srpId = req.getSrp().getOperationId().getValue();
-        final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
-            this.testAddress, this.testAddress, this.testAddress, Optional.absent());
-        final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
-                .setRemove(false).setOperational(OperationalStatus.Active).build(),
-            Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
-        this.listener.onMessage(this.session, pcRpt);
-        readDataOperational(getDataBroker(), TOPO_IID, topology -> {
-            assertEquals(1, topology.getNode().size());
-            return topology;
-        });
     }
 
     @Test