Terminate outstanding requests on clean shutdown 27/100727/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 22 Apr 2022 17:20:05 +0000 (19:20 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 22 Apr 2022 18:36:19 +0000 (20:36 +0200)
When we initiate session shutdown we need to cancel all requests. Update
tests to disable RPC timeouts to expose the fact we have been failing to
do that.

JIRA: BGPCEP-1007
Change-Id: I4159de8fa4c69041608032cd172e2f66afd2060c
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologySessionListenerTest.java

index 90a2fb4ec5462186786c0cc95aa5197c9fe24bc0..165781918c387151db7166344177dde5bdc7cab9 100644 (file)
@@ -278,30 +278,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
                 session = null;
                 listenerState = null;
                 syncOptimization = null;
-
-                // Clear all requests we know about
-                for (final Entry<SrpIdNumber, PCEPRequest> e : requests.entrySet()) {
-                    // FIXME: exhaustive when we have JDK17+
-                    switch (e.getValue().cancel()) {
-                        case DONE:
-                            // Done is done, nothing to do
-                            LOG.trace("Request {} was done when session went down.", e.getKey());
-                            break;
-                        case UNACKED:
-                            // Peer has not acked: results in failure
-                            LOG.info("Request {} was incomplete when session went down, failing the instruction",
-                                    e.getKey());
-                            break;
-                        case UNSENT:
-                            // Peer has not been sent to the peer: results in cancellation
-                            LOG.debug("Request {} was not sent when session went down, cancelling the instruction",
-                                    e.getKey());
-                            break;
-                        default:
-                            break;
-                    }
-                }
-                requests.clear();
+                clearRequests();
             }
         }
     }
@@ -379,7 +356,11 @@ public abstract class AbstractTopologySessionListener implements TopologySession
                 if (session != null) {
                     LOG.info("Closing session {}", session);
                     session.close(TerminationReason.UNKNOWN);
+                    session = null;
                 }
+                listenerState = null;
+                syncOptimization = null;
+                clearRequests();
             }
         }
     }
@@ -393,6 +374,33 @@ public abstract class AbstractTopologySessionListener implements TopologySession
         }
     }
 
+    @Holding({"this.serverSessionManager", "this"})
+    private void clearRequests() {
+        // Clear all requests we know about
+        for (final Entry<SrpIdNumber, PCEPRequest> e : requests.entrySet()) {
+            // FIXME: exhaustive when we have JDK17+
+            switch (e.getValue().cancel()) {
+                case DONE:
+                    // Done is done, nothing to do
+                    LOG.trace("Request {} was done when session went down.", e.getKey());
+                    break;
+                case UNACKED:
+                    // Peer has not acked: results in failure
+                    LOG.info("Request {} was incomplete when session went down, failing the instruction",
+                            e.getKey());
+                    break;
+                case UNSENT:
+                    // Peer has not been sent to the peer: results in cancellation
+                    LOG.debug("Request {} was not sent when session went down, cancelling the instruction",
+                            e.getKey());
+                    break;
+                default:
+                    break;
+            }
+        }
+        requests.clear();
+    }
+
     final synchronized PCEPRequest removeRequest(final SrpIdNumber id) {
         final PCEPRequest ret = requests.remove(id);
         if (ret != null && listenerState != null) {
index 9617d84bbcce2e4dd4105090acad614daf0b1092..dd874a6b1c26f01d3284bc56e2219f666634f621 100644 (file)
@@ -9,17 +9,20 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.TRUE;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.opendaylight.protocol.pcep.pcc.mock.spi.MsgBuilderUtil.createLspTlvs;
 import static org.opendaylight.protocol.util.CheckTestUtil.checkEquals;
 import static org.opendaylight.protocol.util.CheckTestUtil.checkNotPresentOperational;
 import static org.opendaylight.protocol.util.CheckTestUtil.readDataOperational;
 
+import com.google.common.util.concurrent.Futures;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -34,7 +37,6 @@ import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.protocol.pcep.pcc.mock.spi.MsgBuilderUtil;
 import org.opendaylight.protocol.pcep.spi.AbstractMessageParser;
 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.protocol.util.CheckTestUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4AddressNoZone;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.network.topology.rev140113.NetworkTopologyRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.initiated.rev200720.Pcinitiate;
@@ -134,9 +136,9 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         // add-lsp
         topologyRpcs.addLsp(createAddLspInput());
         assertEquals(1, receivedMsgs.size());
-        assertTrue(receivedMsgs.get(0) instanceof Pcinitiate);
-        final Pcinitiate pcinitiate = (Pcinitiate) receivedMsgs.get(0);
-        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+        final var pcinitiate = receivedMsgs.get(0);
+        assertThat(pcinitiate, instanceOf(Pcinitiate.class));
+        final Requests req = ((Pcinitiate) pcinitiate).getPcinitiateMessage().getRequests().get(0);
         final Uint32 srpId = req.getSrp().getOperationId().getValue();
         final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
                 testAddress, testAddress, testAddress, Optional.empty());
@@ -191,9 +193,9 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
                 .setNode(nodeId).build();
         topologyRpcs.updateLsp(update);
         assertEquals(2, receivedMsgs.size());
-        assertTrue(receivedMsgs.get(1) instanceof Pcupd);
-        final Pcupd updateMsg = (Pcupd) receivedMsgs.get(1);
-        final Updates upd = updateMsg.getPcupdMessage().getUpdates().get(0);
+        final var updateMsg = receivedMsgs.get(1);
+        assertThat(updateMsg, instanceOf(Pcupd.class));
+        final Updates upd = ((Pcupd) updateMsg).getPcupdMessage().getUpdates().get(0);
         final Uint32 srpId2 = upd.getSrp().getOperationId().getValue();
         final Tlvs tlvs2 = createLspTlvs(upd.getLsp().getPlspId().getValue(), false,
                 newDestinationAddress, testAddress, testAddress, Optional.empty());
@@ -248,9 +250,9 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
                 .setNetworkTopologyRef(new NetworkTopologyRef(TOPO_IID)).setNode(nodeId).build();
         topologyRpcs.removeLsp(remove);
         assertEquals(3, receivedMsgs.size());
-        assertTrue(receivedMsgs.get(2) instanceof Pcinitiate);
-        final Pcinitiate pcinitiate2 = (Pcinitiate) receivedMsgs.get(2);
-        final Requests req2 = pcinitiate2.getPcinitiateMessage().getRequests().get(0);
+        final var pcinitiate2 =  receivedMsgs.get(2);
+        assertThat(pcinitiate2, instanceOf(Pcinitiate.class));
+        final Requests req2 = ((Pcinitiate) pcinitiate2).getPcinitiateMessage().getRequests().get(0);
         final Uint32 srpId3 = req2.getSrp().getOperationId().getValue();
         final Tlvs tlvs3 = createLspTlvs(req2.getLsp().getPlspId().getValue(), false,
                 testAddress, testAddress, testAddress, Optional.empty());
@@ -289,8 +291,10 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         final Message errorMsg = MsgBuilderUtil.createErrorMsg(PCEPErrors.NON_ZERO_PLSPID, Uint32.ONE);
         listener.onSessionUp(session);
         final Future<RpcResult<AddLspOutput>> futureOutput = topologyRpcs.addLsp(createAddLspInput());
-        listener.onMessage(session, errorMsg);
+        assertFalse(futureOutput.isDone());
 
+        listener.onMessage(session, errorMsg);
+        assertTrue(futureOutput.isDone());
         final AddLspOutput output = futureOutput.get().getResult();
         assertEquals(FailureType.Failed, output.getFailure());
         assertEquals(1, output.getError().size());
@@ -303,10 +307,15 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
     public void testOnSessionDown() throws InterruptedException, ExecutionException {
         listener.onSessionUp(session);
         // send request
+        manager.setRpcTimeout((short) 0);
         final Future<RpcResult<AddLspOutput>> futureOutput = topologyRpcs.addLsp(createAddLspInput());
         assertFalse(session.isClosed());
+        assertFalse(futureOutput.isDone());
+
         listener.onSessionDown(session, new IllegalArgumentException());
         assertTrue(session.isClosed());
+        assertTrue(futureOutput.isDone());
+
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
         assertEquals(FailureType.Unsent, output.getFailure());
@@ -321,13 +330,18 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         // the session should not be closed when session manager is up
         assertFalse(session.isClosed());
         // send request
+        manager.setRpcTimeout((short) 0);
         final Future<RpcResult<AddLspOutput>> futureOutput = topologyRpcs.addLsp(createAddLspInput());
+        assertFalse(futureOutput.isDone());
+
         stopSessionManager();
+        // verify the session is closed after server session manager is closed
+        assertTrue(session.isClosed());
+        assertTrue(futureOutput.isDone());
+
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
         assertEquals(FailureType.Unsent, output.getFailure());
-        // verify the session is closed after server session manager is closed
-        assertTrue(session.isClosed());
     }
 
     /**
@@ -344,7 +358,10 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         // verify the session is closed due to server session manager is closed
         assertTrue(session.isClosed());
         // send request
+        manager.setRpcTimeout((short) 0);
         final Future<RpcResult<AddLspOutput>> futureOutput = topologyRpcs.addLsp(createAddLspInput());
+        assertTrue(futureOutput.isDone());
+
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
         assertEquals(FailureType.Unsent, output.getFailure());
@@ -361,7 +378,10 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         // verify the session is closed due to server session manager is closed
         assertTrue(session.isClosed());
         // send request
+        manager.setRpcTimeout((short) 0);
         final Future<RpcResult<AddLspOutput>> futureOutput = topologyRpcs.addLsp(createAddLspInput());
+        assertTrue(futureOutput.isDone());
+
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
         assertEquals(FailureType.Unsent, output.getFailure());
@@ -427,7 +447,7 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         checkNotPresentOperational(getDataBroker(), pathComputationClientIId);
         assertFalse(receivedMsgs.isEmpty());
         // the last message should be a Close message
-        assertTrue(receivedMsgs.get(receivedMsgs.size() - 1) instanceof Close);
+        assertThat(receivedMsgs.get(receivedMsgs.size() - 1), instanceOf(Close.class));
     }
 
     @Test
@@ -531,9 +551,9 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         listener.onSessionUp(session);
         topologyRpcs.addLsp(createAddLspInput());
         assertEquals(1, receivedMsgs.size());
-        assertTrue(receivedMsgs.get(0) instanceof Pcinitiate);
-        final Pcinitiate pcinitiate = (Pcinitiate) receivedMsgs.get(0);
-        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+        final var pcinitiate =  receivedMsgs.get(0);
+        assertThat(pcinitiate, instanceOf(Pcinitiate.class));
+        final Requests req = ((Pcinitiate) pcinitiate).getPcinitiateMessage().getRequests().get(0);
         final Uint32 srpId = req.getSrp().getOperationId().getValue();
         final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
                 testAddress, testAddress, testAddress, Optional.empty());
@@ -547,7 +567,7 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         listener.onMessage(session, pcRpt);
 
         //try to add already existing LSP
-        final AddLspOutput result = topologyRpcs.addLsp(createAddLspInput()).get().getResult();
+        final AddLspOutput result = Futures.getDone(topologyRpcs.addLsp(createAddLspInput())).getResult();
         assertEquals(FailureType.Unsent, result.getFailure());
         assertEquals(1, result.getError().size());
         final ErrorObject errorObject = result.getError().get(0).getErrorObject();
@@ -557,22 +577,16 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
     }
 
     @Test
-    @SuppressWarnings("checkstyle:IllegalCatch")
     public void testPccResponseTimeout() throws Exception {
         listener.onSessionUp(session);
         final Future<RpcResult<AddLspOutput>> addLspResult = topologyRpcs.addLsp(createAddLspInput());
-        try {
-            addLspResult.get(2, TimeUnit.SECONDS);
-            fail();
-        } catch (final Exception e) {
-            assertTrue(e instanceof TimeoutException);
-        }
-        Thread.sleep(AbstractPCEPSessionTest.RPC_TIMEOUT);
-        CheckTestUtil.checkEquals(() -> {
-            final RpcResult<AddLspOutput> rpcResult = addLspResult.get();
-            assertNotNull(rpcResult);
-            assertEquals(rpcResult.getResult().getFailure(), FailureType.Unsent);
-        });
+        assertFalse(addLspResult.isDone());
+        assertThrows(TimeoutException.class, () -> addLspResult.get(RPC_TIMEOUT / 2, TimeUnit.SECONDS));
+        Thread.sleep(TimeUnit.SECONDS.toMillis(RPC_TIMEOUT));
+        assertTrue(addLspResult.isDone());
+        final RpcResult<AddLspOutput> rpcResult = addLspResult.get();
+        assertNotNull(rpcResult);
+        assertEquals(rpcResult.getResult().getFailure(), FailureType.Unsent);
     }
 
     @Test
@@ -580,9 +594,9 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         listener.onSessionUp(session);
         topologyRpcs.addLsp(createAddLspInput());
         assertEquals(1, receivedMsgs.size());
-        assertTrue(receivedMsgs.get(0) instanceof Pcinitiate);
-        final Pcinitiate pcinitiate = (Pcinitiate) receivedMsgs.get(0);
-        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+        final var pcinitiate = receivedMsgs.get(0);
+        assertThat(pcinitiate, instanceOf(Pcinitiate.class));
+        final Requests req = ((Pcinitiate) pcinitiate).getPcinitiateMessage().getRequests().get(0);
         final Uint32 srpId = req.getSrp().getOperationId().getValue();
         final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
                 testAddress, testAddress, testAddress, Optional.empty());
@@ -604,9 +618,9 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         listener.onSessionUp(session);
         topologyRpcs.addLsp(createAddLspInput());
         assertEquals(1, receivedMsgs.size());
-        assertTrue(receivedMsgs.get(0) instanceof Pcinitiate);
-        final Pcinitiate pcinitiate = (Pcinitiate) receivedMsgs.get(0);
-        final Requests req = pcinitiate.getPcinitiateMessage().getRequests().get(0);
+        final var pcinitiate = receivedMsgs.get(0);
+        assertThat(pcinitiate, instanceOf(Pcinitiate.class));
+        final Requests req = ((Pcinitiate) pcinitiate).getPcinitiateMessage().getRequests().get(0);
         final Uint32 srpId = req.getSrp().getOperationId().getValue();
         final Tlvs tlvs = createLspTlvs(req.getLsp().getPlspId().getValue(), true,
                 testAddress, testAddress, testAddress, Optional.empty());