BUG-8156 Terminate PCEP session properly when ServerSessionManager is closed 29/54929/20
authorKevin Wang <kevixw@gmail.com>
Wed, 5 Apr 2017 01:08:07 +0000 (18:08 -0700)
committerKevin Wang <kevixw@gmail.com>
Thu, 22 Jun 2017 19:48:40 +0000 (12:48 -0700)
When the PCEP topology provider is loading the configuration initially,
the ServerSessionManager will be restarted.  Any new PCEP connection
coming in during this period will be rejected by the ServerSessionManager.

This patch is to terminate these rejected PCEP session properly when above
situation happens, so that the session can be established successfully
in the next retry.

Change-Id: Ifa3366c97025a0b31fb7d3ee50b1142f63a6209c
Signed-off-by: Kevin Wang <kevixw@gmail.com>
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPSession.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPSessionListener.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/TerminationReason.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImplTest.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/SimpleSessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/SessionListenerState.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractPCEPSessionTest.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/Stateful07TopologySessionListenerTest.java

index 656daaf25a8cd1ed42eee1175ffeac3c716989e8..7aca1e51a23b185eadba19d0353ccfe497a9b441 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.protocol.pcep;
 
 import io.netty.util.concurrent.Future;
 import java.net.InetAddress;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
@@ -31,7 +33,24 @@ public interface PCEPSession extends AutoCloseable, PcepSessionState {
      */
     Future<Void> sendMessage(Message message);
 
-    void close(TerminationReason reason);
+    /**
+     * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
+     * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
+     * inside the session or from the listener, therefore the parent of this session should be informed.
+     * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
+     *               When the reason provided is null, no CLOSE message will be sent.
+     */
+    void close(@Nullable TerminationReason reason);
+
+    /**
+     * Terminate a PCEP session.  A CLOSE message with given reason will be sent to remote peer by invoking
+     * {@link #close(TerminationReason)} method.
+     *
+     * It triggers {@link PCEPSessionListener#onSessionTerminated(PCEPSession, PCEPTerminationReason)} after closing.
+     * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
+     *               The reason cannot be null.
+     */
+    void terminate(@Nonnull TerminationReason reason);
 
     Tlvs getRemoteTlvs();
 
index 8bca10edd35f170a767c19360b7993f2d3f54219..032372396c05b267e8df98a956dc2448d3fb3674 100644 (file)
@@ -31,7 +31,8 @@ public interface PCEPSessionListener extends EventListener {
     void onSessionDown(PCEPSession session, Exception e);
 
     /**
-     * Fired when the session is terminated locally. The session has already been closed and transitioned to IDLE state.
+     * Fired when the session is terminated locally or upon a CLOSE message from remote peer is received.
+     * The session has already been closed and transitioned to IDLE state.
      * Any outstanding queued messages were not sent. The user should not attempt to make any use of the session.
      *
      * @param reason the cause why the session went down
index 9eae09028c0c7ca0e667d20ce40ce3bc395e2559..974a2621c873b9f71db080a3c97ecc0dd3b4517b 100644 (file)
@@ -7,23 +7,12 @@
  */
 package org.opendaylight.protocol.pcep;
 
-import com.google.common.collect.Maps;
-import java.util.Map;
-
 public enum TerminationReason {
     UNKNOWN((short) 1), EXP_DEADTIMER((short) 2), MALFORMED_MSG((short) 3), TOO_MANY_UNKNWN_REQS((short) 4), TOO_MANY_UNKNOWN_MSGS((short) 5);
 
-    private short value;
-    private static final Map<Short, TerminationReason> VALUE_MAP;
-
-    static {
-        VALUE_MAP = Maps.newHashMap();
-        for (final TerminationReason enumItem : TerminationReason.values()) {
-            VALUE_MAP.put(enumItem.value, enumItem);
-        }
-    }
+    private final short value;
 
-    private TerminationReason(final short value) {
+    TerminationReason(final short value) {
         this.value = value;
     }
 
@@ -43,6 +32,18 @@ public enum TerminationReason {
      * @return corresponding TerminationReason item
      */
     public static TerminationReason forValue(final short valueArg) {
-        return VALUE_MAP.get(valueArg);
+        for (final TerminationReason reason : values()) {
+            if (reason.value == valueArg) {
+                return reason;
+            }
+        }
+        return null;
+    }
+
+    public static TerminationReason forValue(final Short valueArg) {
+        if (valueArg == null) {
+            return null;
+        }
+        return forValue(valueArg.shortValue());
     }
 }
index 04152c312257442f71c946b26ef88cd74cdb5c42..6d9c1ea523bab399c3ef11606e1ace92c271100a 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Date;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
 import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
@@ -42,6 +43,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
@@ -88,6 +90,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
     private int maxUnknownMessages;
 
     // True if the listener should not be notified about events
+    @GuardedBy("this")
     private boolean closed = false;
 
     private final Channel channel;
@@ -157,7 +160,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
      */
-    private  void handleKeepaliveTimer() {
+    private void handleKeepaliveTimer() {
         final long ct = TICKER.read();
 
         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
@@ -178,7 +181,7 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
      * @param msg to be sent
      */
     @Override
-    public Future<Void> sendMessage(final Message msg) {
+    public synchronized Future<Void> sendMessage(final Message msg) {
         final ChannelFuture f = this.channel.writeAndFlush(msg);
         this.lastMessageSentAt = TICKER.read();
         this.sessionState.updateLastSentMsg();
@@ -202,31 +205,38 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
 
     @VisibleForTesting
     ChannelFuture closeChannel() {
-        LOG.info("Closing PCEP session: {}", this);
+        LOG.info("Closing PCEP session channel: {}", this.channel);
         return this.channel.close();
     }
 
+    @VisibleForTesting
+    public synchronized boolean isClosed() {
+        return this.closed;
+    }
+
     /**
      * Closes PCEP session without sending a Close message, as the channel is no longer active.
      */
     @Override
-    public void close() {
-        LOG.info("Closing PCEP session: {}", this);
-        closeChannel();
+    public synchronized void close() {
+        close(null);
     }
 
-    /**
-     * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
-     * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
-     * inside the session or from the listener, therefore the parent of this session should be informed.
-     */
     @Override
     public synchronized void close(final TerminationReason reason) {
-        LOG.info("Closing PCEP session: {}", this);
+        if (isClosed()) {
+            return;
+        }
         this.closed = true;
-        this.sendMessage(new CloseBuilder().setCCloseMessage(
-            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
-        this.close();
+        // only send close message when the reason is provided
+        if (reason != null) {
+            LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+            sendMessage(new CloseBuilder().setCCloseMessage(
+                    new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+        } else {
+            LOG.info("Closing PCEP session: {}", this);
+        }
+        closeChannel();
     }
 
     @Override
@@ -239,18 +249,18 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
     }
 
-    private synchronized void terminate(final TerminationReason reason) {
-        LOG.info("Local PCEP session termination : {}", reason);
+    @Override
+    public synchronized void terminate(final TerminationReason reason) {
+        if (isClosed()) {
+            return;
+        }
+        close(reason);
         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
-        this.closed = true;
-        this.sendMessage(new CloseBuilder().setCCloseMessage(
-            new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
-        this.close();
     }
 
     public synchronized void endOfInput() {
-        if (!this.closed) {
-            this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
+        if (!isClosed()) {
+            this.listener.onSessionDown(this, new IOException("End of input detected. Close the session " + this));
             this.closed = true;
         }
     }
@@ -316,7 +326,16 @@ public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implem
              * exception is CLOSE message, which needs to be converted into a
              * session DOWN event.
              */
-            this.close();
+            final CCloseMessage closeMsg = ((CloseMessage) msg).getCCloseMessage();
+            TerminationReason reason = null;
+            if (closeMsg != null && closeMsg.getCClose() != null) {
+                reason = TerminationReason.forValue(closeMsg.getCClose().getReason());
+            }
+            if (reason == null) {
+                reason = TerminationReason.UNKNOWN;
+            }
+            this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
+            close();
         } else {
             // This message needs to be handled by the user
             if (msg instanceof PcerrMessage) {
index 8eccb4b8e48389108253dd24b1bcbf7ae44a7bbb..f6404d5e430009859c964b2c96d499adf2ffe990 100644 (file)
@@ -55,10 +55,12 @@ public class PCEPSessionImplTest extends AbstractPCEPSessionTest {
         Assert.assertTrue(this.listener.messages.get(0) instanceof Pcreq);
         Assert.assertEquals(2, this.session.getMessages().getReceivedMsgCount().intValue());
 
+        Assert.assertTrue(this.listener.up);
         this.session.handleMessage(new CloseBuilder().build());
         Assert.assertEquals(3, this.session.getMessages().getReceivedMsgCount().intValue());
         Assert.assertEquals(1, this.listener.messages.size());
         Assert.assertTrue(this.channel.isActive());
+        Assert.assertFalse(this.listener.up);
         Mockito.verify(this.channel, Mockito.times(1)).close();
 
         this.session.resetStats();
index b0d3cf7250d52f7ada00b926d7fac85a4a95ebd7..4c3ba69252dba5f65107b8682231fd56d69e1740 100644 (file)
@@ -55,5 +55,6 @@ public class SimpleSessionListener implements PCEPSessionListener {
     @Override
     public void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason cause) {
         LOG.debug("Session terminated. Cause : {}", cause.toString());
+        this.up = false;
     }
 }
index 06b8910e6c90a866711a1211682ddeb234603434..2f0ee9cc1a9d2731b4bd7f62107f9a46d251bcf7 100755 (executable)
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
@@ -125,6 +126,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     private SyncOptimization syncOptimization;
     private boolean triggeredResyncInProcess;
 
+    @GuardedBy("this")
     private ListenerStateRuntimeRegistration registration;
     @GuardedBy("this")
     private final SessionListenerState listenerState;
@@ -150,13 +152,13 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         // takeNodeState(..) may fail when the server session manager is being restarted due to configuration change
         if (state == null) {
             LOG.error("Unable to fetch topology node state for PCEP session. Closing session {}", session);
-            this.onSessionDown(session, new RuntimeException("Unable to fetch topology node state for PCEP session with " + session.getRemoteAddress()));
+            session.terminate(TerminationReason.UNKNOWN);
             return;
         }
 
         if (this.session != null || this.nodeState != null) {
             LOG.error("PCEP session is already up. Closing session {}", session);
-            this.onSessionDown(session, new IllegalStateException("Session is already up with " + session.getRemoteAddress()));
+            session.terminate(TerminationReason.UNKNOWN);
             return;
         }
 
@@ -184,7 +186,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         register();
         if (this.registration == null) {
             LOG.error("PCEP session fails to register. Closing session {}", session);
-            this.onSessionDown(session, new RuntimeException("PCEP Session with " + session.getRemoteAddress() + " fails to register."));
+            session.terminate(TerminationReason.UNKNOWN);
             return;
         }
         this.listenerState.init(session);
@@ -215,6 +217,10 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     }
 
     protected void updatePccState(final PccSyncState pccSyncState) {
+        if (this.serverSessionManager.isClosed()) {
+            LOG.debug("Ignore PCC state update for {} as session manager has been closed.", this.session);
+            return;
+        }
         final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
         if (pccSyncState != PccSyncState.Synchronized) {
@@ -249,34 +255,10 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     private synchronized void tearDown(final PCEPSession session) {
         Preconditions.checkNotNull(session);
         this.serverSessionManager.releaseNodeState(this.nodeState, session, isLspDbPersisted());
-        this.nodeState = null;
-        this.session = null;
-        this.syncOptimization = null;
-        unregister();
-
-        // Clear all requests we know about
-        for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
-            final PCEPRequest r = e.getValue();
-            switch (r.getState()) {
-            case DONE:
-                // Done is done, nothing to do
-                LOG.trace("Request {} was done when session went down.", e.getKey());
-                break;
-            case UNACKED:
-                // Peer has not acked: results in failure
-                LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
-                r.done(OperationResults.NOACK);
-                break;
-            case UNSENT:
-                // Peer has not been sent to the peer: results in cancellation
-                LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
-                r.done(OperationResults.UNSENT);
-                break;
-            default:
-                break;
-            }
-        }
-        this.requests.clear();
+        // Do not send CLOSE message here.
+        // * In #onSessionDown(..), that channel is already unavailable, thus we won't be able to send out the message.
+        // * In #OnSessionTerminated(..), a CLOSE message should have already been sent, no need to send again.
+        close(null);
     }
 
     @Override
@@ -287,14 +269,15 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
 
     @Override
     public final synchronized void onSessionTerminated(final PCEPSession session, final PCEPTerminationReason reason) {
-        LOG.info("Session {} terminated by peer with reason {}", session, reason);
+        LOG.info("Session {} terminated with reason {}", session, reason);
         tearDown(session);
     }
 
     @Override
     public final synchronized void onMessage(final PCEPSession session, final Message message) {
-        if (this.nodeState == null) {
-            LOG.warn("Topology node state is null. Unhandled message {} on session {}", message, session);
+        if (this.serverSessionManager.isClosed()) {
+            // we cannot operate on the topology node when the topology is removed by ServerSessionManager
+            LOG.debug("Ignore message from {} as session manager has been closed.", session);
             return;
         }
         final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
@@ -323,20 +306,58 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
+        close(TerminationReason.UNKNOWN);
+    }
+
+    /**
+     * Close this session listener. Reset all session related status
+     *
+     * @param reason The {@link TerminationReason} to be wrapped in a PCEP CLOSE message and sent to the remote peer.
+     *               When the reason provided is null, no CLOSE message will be sent.
+     */
+    private final synchronized void close(@Nullable final TerminationReason reason) {
         unregister();
         if (this.session != null) {
-            this.session.close(TerminationReason.UNKNOWN);
+            this.session.close(reason);
+        }
+        this.session = null;
+        this.nodeState = null;
+        this.syncOptimization = null;
+
+        // Clear all requests we know about
+        for (final Entry<S, PCEPRequest> e : this.requests.entrySet()) {
+            final PCEPRequest r = e.getValue();
+            switch (r.getState()) {
+                case DONE:
+                    // Done is done, nothing to do
+                    LOG.trace("Request {} was done when session went down.", e.getKey());
+                    break;
+                case UNACKED:
+                    // Peer has not acked: results in failure
+                    LOG.info("Request {} was incomplete when session went down, failing the instruction", e.getKey());
+                    r.done(OperationResults.NOACK);
+                    break;
+                case UNSENT:
+                    // Peer has not been sent to the peer: results in cancellation
+                    LOG.debug("Request {} was not sent when session went down, cancelling the instruction", e.getKey());
+                    r.done(OperationResults.UNSENT);
+                    break;
+                default:
+                    break;
+            }
         }
+        this.requests.clear();
+        this.listenerState.destroy();
     }
 
     private final synchronized void unregister() {
         if (this.registration != null) {
             this.registration.close();
-            LOG.trace("PCEP session {} is unregistered successfully.", this.session);
+            LOG.debug("PCEP session {} is unregistered successfully.", this.session);
             this.registration = null;
         } else {
-            LOG.trace("PCEP session {} was not registered.", this.session);
+            LOG.debug("PCEP session {} was not registered.", this.session);
         }
     }
 
@@ -345,7 +366,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         final PCEPTopologyProviderRuntimeRegistration runtimeReg = this.serverSessionManager.getRuntimeRootRegistration();
         if (runtimeReg != null) {
             this.registration = runtimeReg.register(this);
-            LOG.trace("PCEP session {} is successfully registered.", this.session);
+            LOG.debug("PCEP session {} is successfully registered.", this.session);
         }
     }
 
index 228e33f9f8500b79408d8996dba340faca415a3a..c5fdb06cb82196dfa135d8abff8811143874665a 100755 (executable)
@@ -68,7 +68,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
     private final InstanceIdentifier<Topology> topology;
     private final DataBroker broker;
     private final PCEPStatefulPeerProposal peerProposal;
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final AtomicBoolean isClosed = new AtomicBoolean(true);
     private final short rpcTimeout;
     private final AtomicReference<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = new AtomicReference<>();
 
@@ -100,6 +100,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
             @Override
             public void onSuccess(final Void result) {
                 LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue());
+                ServerSessionManager.this.isClosed.set(false);
             }
 
             @Override
@@ -114,8 +115,19 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
         return new NodeId("pcc://" + addr.getHostAddress());
     }
 
+    boolean isClosed() {
+        return this.isClosed.get();
+    }
+
     synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session, final boolean persistNode) {
-        this.nodes.remove(createNodeId(session.getRemoteAddress()));
+        final NodeId id = createNodeId(session.getRemoteAddress());
+        this.nodes.remove(id);
+        if (isClosed()) {
+            // Since the whole pcep topology is going to be removed by ServerSessionManager, we do not need to remove each single
+            // node separately. Besides, it could cause Optismic Lock on DataStore when operating on the same topology
+            LOG.trace("Server Session Manager is closed. No need to release topology node {}", id);
+            return;
+        }
         if (nodeState != null) {
             LOG.debug("Node {} unbound", nodeState.getNodeId());
             nodeState.released(persistNode);
@@ -124,7 +136,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
 
     synchronized TopologyNodeState takeNodeState(final InetAddress address, final TopologySessionListener sessionListener, final boolean retrieveNode) {
         final NodeId id = createNodeId(address);
-        if (this.isClosed.get()) {
+        if (isClosed()) {
             LOG.error("Server Session Manager is closed. Unable to create topology node {} with listener {}", id, sessionListener);
             return null;
         }
@@ -137,8 +149,12 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
             LOG.debug("Created topology node {} for id {} at {}", ret, id, ret.getNodeId());
             this.state.put(id, ret);
         }
-        // FIXME: else check for conflicting session
 
+        final TopologySessionListener conflictingSessionListener = this.nodes.get(id);
+        if (conflictingSessionListener != null && !sessionListener.equals(conflictingSessionListener)) {
+            LOG.error("Existing session {} is conflict with new session {} on node {}, closing the existing one.", conflictingSessionListener, sessionListener, id);
+            conflictingSessionListener.close();
+        }
         ret.taken(retrieveNode);
         this.nodes.put(id, sessionListener);
         LOG.debug("Node {} bound to listener {}", id, sessionListener);
@@ -190,7 +206,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
         return (l != null) ? l.triggerSync(input) : OperationResults.UNSENT.future();
     }
 
-    synchronized ListenableFuture<Void> closeServiceInstance() {
+    synchronized CheckedFuture<Void, TransactionCommitFailedException> closeServiceInstance() {
         if (this.isClosed.getAndSet(true)) {
             LOG.error("Session Manager has already been closed.");
             Futures.immediateFuture(null);
@@ -224,16 +240,17 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
         return future;
     }
 
+    PCEPTopologyProviderRuntimeRegistration getRuntimeRootRegistration() {
+        return this.runtimeRootRegistration.get();
+    }
+
     synchronized void setRuntimeRootRegistrator(final PCEPTopologyProviderRuntimeRegistrator runtimeRootRegistrator) {
         if (!this.runtimeRootRegistration.compareAndSet(null, runtimeRootRegistrator.register(this))) {
             LOG.error("Runtime root registration has been set before.");
         }
     }
 
-    PCEPTopologyProviderRuntimeRegistration getRuntimeRootRegistration() {
-        return this.runtimeRootRegistration.get();
-    }
-
+    @Override
     public void setPeerSpecificProposal(final InetSocketAddress address, final TlvsBuilder openBuilder) {
         Preconditions.checkNotNull(address);
         this.peerProposal.setPeerProposal(createNodeId(address.getAddress()), openBuilder);
index 340cde0438a3a468665bb7c915bc45571410dcf0..6e3689cafb498699f5c270e0dbe8ec15c4239991 100644 (file)
@@ -47,6 +47,10 @@ final class SessionListenerState {
         this.capa = new PeerCapabilities();
     }
 
+    /**
+     * Initialize the listenerState with a PCEP session
+     * @param session PCEP session that is bind to this listenerState
+     */
     public void init(final PCEPSession session) {
         Preconditions.checkNotNull(session);
         this.localPref = getLocalPref(session.getLocalPref());
@@ -54,6 +58,25 @@ final class SessionListenerState {
         this.sessionUpDuration.start();
     }
 
+    /**
+     * Reset this listenerState instance to it's initial state
+     * This isn't necessary in most case, as a PCEP session listener
+     * is tightly bind to a particular PCEP session
+     * However, it is required in unit test. Because we are reusing
+     * PCEP session listener in a single test, the listenerState
+     * will not be killed when the session is down. So if later
+     * we want to {@link #init(PCEPSession)} a session with the same session listener
+     * again, an exception saying "stopWatch cannot be started again"
+     * will be thrown, and all the statistical counter won't be correct
+     */
+    public void destroy() {
+        this.localPref = null;
+        this.peerPref = null;
+        this.sessionUpDuration.reset();
+        this.capa = new PeerCapabilities();
+        resetStats();
+    }
+
     public void processRequestStats(final long duration) {
         if (this.minReplyTime == 0) {
             this.minReplyTime = duration;
@@ -78,8 +101,10 @@ final class SessionListenerState {
         return msgs;
     }
 
-    public void resetStats(final PCEPSession session) {
-        Preconditions.checkNotNull(session);
+    /**
+     * Reset statistic counters
+     */
+    private void resetStats() {
         this.receivedRptMsgCount = 0;
         this.sentInitMsgCount = 0;
         this.sentUpdMsgCount = 0;
@@ -88,6 +113,11 @@ final class SessionListenerState {
         this.minReplyTime = 0;
         this.totalTime = 0;
         this.reqCount = 0;
+    }
+
+    public void resetStats(final PCEPSession session) {
+        Preconditions.checkNotNull(session);
+        resetStats();
         session.resetStats();
     }
 
index e564d59f3841accc90c4343d8d87d9629de9a598..a35bcadf45d634b30ca23cc7bab82972fa777746 100644 (file)
@@ -8,12 +8,17 @@
 
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
@@ -25,7 +30,9 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.Mock;
@@ -38,9 +45,9 @@ import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopolo
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
 import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiator;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.protocol.util.InetSocketAddressUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix;
@@ -103,6 +110,9 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
     @Mock
     ListenerStateRuntimeRegistration listenerReg;
 
+    @Mock
+    private PCEPTopologyProviderRuntimeRegistrator registrator;
+
     private final Open localPrefs = new OpenBuilder().setDeadTimer((short) 30).setKeepalive((short) 10)
         .setSessionId((short) 0).build();
 
@@ -110,6 +120,8 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
 
     ServerSessionManager manager;
 
+    private PCEPSessionListener sessionListener;
+
     NetworkTopologyPcepService topologyRpcs;
 
     private DefaultPCEPSessionNegotiator neg;
@@ -140,25 +152,50 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
         doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
 
         doNothing().when(this.listenerReg).close();
+        doReturn("listenerReg").when(this.listenerReg).toString();
         final PCEPTopologyProviderRuntimeRegistration topologyReg = mock(PCEPTopologyProviderRuntimeRegistration.class);
         doReturn(this.listenerReg).when(topologyReg).register(any(ListenerStateRuntimeMXBean.class));
         doNothing().when(topologyReg).close();
-        final PCEPTopologyProviderRuntimeRegistrator registrator = mock(PCEPTopologyProviderRuntimeRegistrator.class);
-        doReturn(topologyReg).when(registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
+        doReturn(topologyReg).when(this.registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
 
         final T listenerFactory = (T) ((Class) ((ParameterizedType) this.getClass().getGenericSuperclass())
             .getActualTypeArguments()[0]).newInstance();
         this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, listenerFactory, RPC_TIMEOUT);
-        this.manager.setRuntimeRootRegistrator(registrator);
-        this.manager.instantiateServiceInstance().checkedGet();
+        startSessionManager();
+        this.sessionListener = this.manager.getSessionListener();
         this.neg = new DefaultPCEPSessionNegotiator(mock(Promise.class), this.clientListener,
-            this.manager.getSessionListener(), (short) 1, 5, this.localPrefs);
+            this.sessionListener, (short) 1, 5, this.localPrefs);
         this.topologyRpcs = new TopologyRPCs(this.manager);
     }
 
     @After
     public void tearDown() throws TransactionCommitFailedException {
-        this.manager.closeServiceInstance();
+        stopSessionManager();
+    }
+
+    protected void startSessionManager() throws TransactionCommitFailedException, InterruptedException {
+        this.manager.setRuntimeRootRegistrator(this.registrator);
+        final CheckedFuture<Void, TransactionCommitFailedException> future = this.manager.instantiateServiceInstance();
+        final CountDownLatch lock = new CountDownLatch(1);
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(@Nullable final Void aVoid) {
+                lock.countDown();
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                // the test cannot continue
+                fail();
+            }
+        });
+        future.checkedGet();
+        lock.await(5000, TimeUnit.MILLISECONDS);
+        assertFalse(this.manager.isClosed());
+    }
+
+    protected void stopSessionManager() throws TransactionCommitFailedException {
+        this.manager.closeServiceInstance().checkedGet();
     }
 
     Ero createEroWithIpPrefixes(final List<String> ipPrefixes) {
@@ -186,10 +223,10 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
     }
 
     protected PCEPSessionListener getSessionListener() {
-        return this.manager.getSessionListener();
+        return this.sessionListener;
     }
 
-    protected final PCEPSession getPCEPSession(final Open localOpen, final Open remoteOpen) {
+    protected final PCEPSessionImpl getPCEPSession(final Open localOpen, final Open remoteOpen) {
         return this.neg.createSession(this.clientListener, localOpen, remoteOpen);
     }
 }
index 3867a910f75473750543cd1276a6f159e5c68590..5d590878cec6711568473a36e526f35143c0d5e4 100755 (executable)
@@ -35,9 +35,8 @@ import org.junit.Test;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.SessionState;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.protocol.pcep.PCEPCloseTermination;
-import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.TerminationReason;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.protocol.pcep.pcc.mock.spi.MsgBuilderUtil;
 import org.opendaylight.protocol.pcep.spi.AbstractMessageParser;
 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
@@ -72,6 +71,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.iet
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.pcupd.message.pcupd.message.Updates;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.stateful.capability.tlv.StatefulBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.symbolic.path.name.tlv.SymbolicPathNameBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Close;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.Ipv4CaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.endpoints.address.family.ipv4._case.Ipv4Builder;
@@ -105,7 +105,7 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
 
     private Stateful07TopologySessionListener listener;
 
-    private PCEPSession session;
+    private PCEPSessionImpl session;
 
     @Override
     @Before
@@ -304,7 +304,10 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
         verify(this.listenerReg, times(0)).close();
         // send request
         final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
+        assertFalse(this.session.isClosed());
+        // .onSessionDown() invokes tearDown(session), which invokes session.close(null)
         this.listener.onSessionDown(this.session, new IllegalArgumentException());
+        assertTrue(this.session.isClosed());
         verify(this.listenerReg, times(1)).close();
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
@@ -315,40 +318,43 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
      * All the pcep session registration should be closed when the session manager is closed
      * @throws InterruptedException
      * @throws ExecutionException
-     * @throws TransactionCommitFailedException
      */
     @Test
     public void testOnServerSessionManagerDown() throws InterruptedException, ExecutionException,
-        TransactionCommitFailedException {
+            TransactionCommitFailedException {
         this.listener.onSessionUp(this.session);
+        // the session should not be closed when session manager is up
+        assertFalse(this.session.isClosed());
         verify(this.listenerReg, times(0)).close();
         // send request
         final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
-        this.manager.closeServiceInstance();
+        stopSessionManager();
         verify(this.listenerReg, times(1)).close();
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
         assertEquals(FailureType.Unsent, output.getFailure());
+        // verify the session is closed after server session manager is closed
+        assertTrue(this.session.isClosed());
     }
 
     /**
      * Verify the PCEP session should not be up when server session manager is down,
      * otherwise it would be a problem when the session is up while it's not registered with session manager
-     * @throws InterruptedException
-     * @throws ExecutionException
-     * @throws TransactionCommitFailedException
      */
     @Test
     public void testOnServerSessionManagerUnstarted() throws InterruptedException, ExecutionException,
-        TransactionCommitFailedException, ReadFailedException {
-        this.manager.closeServiceInstance();
+            TransactionCommitFailedException, ReadFailedException {
+        stopSessionManager();
         // the registration should not be closed since it's never initialized
         verify(this.listenerReg, times(0)).close();
+        assertFalse(this.session.isClosed());
         this.listener.onSessionUp(this.session);
         // verify the session was NOT added to topology
         checkNotPresentOperational(getDataBroker(), TOPO_IID);
         // still, the session should not be registered and thus close() is never called
         verify(this.listenerReg, times(0)).close();
+        // verify the session is closed due to server session manager is closed
+        assertTrue(this.session.isClosed());
         // send request
         final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
         final AddLspOutput output = futureOutput.get().getResult();
@@ -377,10 +383,99 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
             return topology;
         });
 
+        assertFalse(this.session.isClosed());
+        this.session.terminate(TerminationReason.UNKNOWN);
+        assertTrue(this.session.isClosed());
+        verify(this.listenerReg, times(1)).close();
         // node should be removed after termination
-        this.listener.onSessionTerminated(this.session, new PCEPCloseTermination(TerminationReason.UNKNOWN));
+        checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+        assertFalse(this.receivedMsgs.isEmpty());
+        // the last message should be a Close message
+        assertTrue(this.receivedMsgs.get(this.receivedMsgs.size() - 1) instanceof Close);
+    }
+
+    /**
+     * When a session is somehow duplicated in controller, the controller should drop existing session
+     */
+    @Test
+    public void testDuplicatedSession() throws ReadFailedException {
+        this.listener.onSessionUp(this.session);
+        verify(this.listenerReg, times(0)).close();
+
+        // create node
+        this.topologyRpcs.addLsp(createAddLspInput());
+        final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
+        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+        final long srpId = req.getSrp().getOperationId().getValue();
+        final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
+            this.testAddress, this.testAddress, this.testAddress, Optional.absent());
+        final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
+                .setRemove(false).setOperational(OperationalStatus.Active).build(),
+            Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
+        this.listener.onMessage(this.session, pcRpt);
+        readDataOperational(getDataBroker(), TOPO_IID, topology -> {
+            assertEquals(1, topology.getNode().size());
+            return topology;
+        });
+
+        // now we do session up again
+        this.listener.onSessionUp(this.session);
+        assertTrue(this.session.isClosed());
         verify(this.listenerReg, times(1)).close();
+        // node should be removed after termination
+        checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+    }
+
+    @Test
+    public void testOnServerSessionManagerRestartAndSessionRecovery() throws Exception {
+        // close server session manager first
+        stopSessionManager();
+        // the registration should not be closed since it's never initialized
+        verify(this.listenerReg, times(0)).close();
+        assertFalse(this.session.isClosed());
+        this.listener.onSessionUp(this.session);
+        // verify the session was NOT added to topology
+        checkNotPresentOperational(getDataBroker(), TOPO_IID);
+        // still, the session should not be registered and thus close() is never called
+        verify(this.listenerReg, times(0)).close();
+        // verify the session is closed due to server session manager is closed
+        assertTrue(this.session.isClosed());
+        // send request
+        final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
+        final AddLspOutput output = futureOutput.get().getResult();
+        // deal with unsent request after session down
+        assertEquals(FailureType.Unsent, output.getFailure());
+        // PCC client is not there
         checkNotPresentOperational(getDataBroker(), this.pathComputationClientIId);
+
+        // reset received message queue
+        this.receivedMsgs.clear();
+        // now we restart the session manager
+        startSessionManager();
+        // try to start the session again
+        // notice since the session was terminated before, it is not usable anymore.
+        // we need to get a new session instance. the new session will have the same local / remote preference
+        this.session = getPCEPSession(getLocalPref(), getRemotePref());
+        verify(this.listenerReg, times(0)).close();
+        assertFalse(this.session.isClosed());
+        this.listener.onSessionUp(this.session);
+        assertFalse(this.session.isClosed());
+
+        // create node
+        this.topologyRpcs.addLsp(createAddLspInput());
+        final Pcinitiate pcinitiate = (Pcinitiate) this.receivedMsgs.get(0);
+        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+        final long srpId = req.getSrp().getOperationId().getValue();
+        final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
+            this.testAddress, this.testAddress, this.testAddress, Optional.absent());
+        final Pcrpt pcRpt = MsgBuilderUtil.createPcRtpMessage(new LspBuilder(req.getLsp()).setTlvs(tlvs).setSync(true)
+                .setRemove(false).setOperational(OperationalStatus.Active).build(),
+            Optional.of(MsgBuilderUtil.createSrp(srpId)), MsgBuilderUtil.createPath(req.getEro().getSubobject()));
+        this.listener.onMessage(this.session, pcRpt);
+        readDataOperational(getDataBroker(), TOPO_IID, topology -> {
+            assertEquals(1, topology.getNode().size());
+            return topology;
+        });
     }
 
     @Test