NETCONF-608 - Change Netconf keepalives to not send during large payload replies 89/85889/1
authorJakub Tóth <jtoth@luminanetworks.com>
Wed, 20 Feb 2019 10:11:35 +0000 (11:11 +0100)
committerMiroslav Macko <mmacko@luminanetworks.com>
Wed, 20 Nov 2019 12:30:23 +0000 (13:30 +0100)
  * stop to send the keepalive rpc to device while ODL is waiting/processing
    the response from the device
  * junit tests

Change-Id: Ib5f4c9a8e2c4cac92887750b687ea8c50e58f3c0
Signed-off-by: Jakub Tóth <jtoth@luminanetworks.com>
Signed-off-by: Miroslav Macko <mmacko@luminanetworks.com>
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java
netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacadeResponseWaitingTest.java [new file with mode: 0644]

index 59ce05edca3198cfe39b067dbe202e36cc4ac796..3b255ed009125becf3eb1abd571df92f2c0e3690 100644 (file)
@@ -139,7 +139,8 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
             final DOMActionService deviceAction) {
         this.currentDeviceRpc = deviceRpc;
         final DOMRpcService deviceRpc1 =
-                new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor);
+                new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor,
+                        new ResponseWaitingScheduler());
 
         salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1, deviceAction);
 
@@ -196,14 +197,14 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
             LOG.trace("{}: Invoking keepalive RPC", id);
 
             try {
-                boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
+                final boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
                 if (!lastJobSucceeded) {
                     onFailure(new IllegalStateException("Previous keepalive timed out"));
                 } else {
                     Futures.addCallback(currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD), this,
                                         MoreExecutors.directExecutor());
                 }
-            } catch (NullPointerException e) {
+            } catch (final NullPointerException e) {
                 LOG.debug("{}: Skipping keepalive while reconnecting", id);
                 // Empty catch block intentional
                 // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and
@@ -257,6 +258,63 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
         }
     }
 
+    private final class ResponseWaitingScheduler {
+
+        private ScheduledFuture<?> schedule;
+
+        public void initScheduler(final Runnable runnable) {
+            if (currentKeepalive != null) {
+                currentKeepalive.cancel(true);
+            } else {
+                LOG.trace("Keepalive does not exist.");
+            }
+            scheduleKeepalives();
+            //Listening on the result should be done before the keepalive rpc will be send
+            final long delay = (keepaliveDelaySeconds * 1000) - 500;
+            schedule = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+        }
+
+        public void stopScheduler() {
+            if (schedule != null) {
+                schedule.cancel(true);
+            } else {
+                LOG.trace("Scheduler does not exist.");
+            }
+        }
+    }
+
+    private static final class ResponseWaiting implements Runnable {
+
+        private final FluentFuture<DOMRpcResult> rpcResultFuture;
+        private final ResponseWaitingScheduler responseWaitingScheduler;
+
+        ResponseWaiting(final ResponseWaitingScheduler responseWaitingScheduler,
+                final FluentFuture<DOMRpcResult> rpcResultFuture) {
+            this.responseWaitingScheduler = responseWaitingScheduler;
+            this.rpcResultFuture = rpcResultFuture;
+        }
+
+        public void start() {
+            LOG.trace("Start to waiting for result.");
+            responseWaitingScheduler.initScheduler(this);
+        }
+
+        public void stop() {
+            LOG.info("Stop to waiting for result.");
+            responseWaitingScheduler.stopScheduler();
+        }
+
+        @Override
+        public void run() {
+            if (!rpcResultFuture.isCancelled() && !rpcResultFuture.isDone()) {
+                LOG.trace("Waiting for result");
+                responseWaitingScheduler.initScheduler(this);
+            } else {
+                LOG.trace("Result has been cancelled or done.");
+            }
+        }
+    }
+
     /*
      * Request timeout task is called once the defaultRequestTimeoutMillis is
      * reached. At this moment, if the request is not yet finished, we cancel
@@ -265,9 +323,11 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
     private static final class RequestTimeoutTask implements Runnable {
 
         private final FluentFuture<DOMRpcResult> rpcResultFuture;
+        private final ResponseWaiting responseWaiting;
 
-        RequestTimeoutTask(final FluentFuture<DOMRpcResult> rpcResultFuture) {
+        RequestTimeoutTask(final FluentFuture<DOMRpcResult> rpcResultFuture, final ResponseWaiting responseWaiting) {
             this.rpcResultFuture = rpcResultFuture;
+            this.responseWaiting = responseWaiting;
         }
 
         @Override
@@ -275,6 +335,9 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
             if (!rpcResultFuture.isDone()) {
                 rpcResultFuture.cancel(true);
             }
+            if (responseWaiting != null) {
+                responseWaiting.stop();
+            }
         }
     }
 
@@ -288,13 +351,16 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
         private final ResetKeepalive resetKeepaliveTask;
         private final long defaultRequestTimeoutMillis;
         private final ScheduledExecutorService executor;
+        private final ResponseWaitingScheduler responseWaitingScheduler;
 
         KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask,
-                final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor) {
+                final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor,
+                final ResponseWaitingScheduler responseWaitingScheduler) {
             this.deviceRpc = deviceRpc;
             this.resetKeepaliveTask = resetKeepaliveTask;
             this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
             this.executor = executor;
+            this.responseWaitingScheduler = responseWaitingScheduler;
         }
 
         public DOMRpcService getDeviceRpc() {
@@ -306,9 +372,11 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
         public @NonNull FluentFuture<DOMRpcResult> invokeRpc(@Nonnull final SchemaPath type,
                                                                       final NormalizedNode<?, ?> input) {
             final FluentFuture<DOMRpcResult> rpcResultFuture = deviceRpc.invokeRpc(type, input);
-            Futures.addCallback(rpcResultFuture, resetKeepaliveTask, MoreExecutors.directExecutor());
+            final ResponseWaiting responseWaiting = new ResponseWaiting(responseWaitingScheduler, rpcResultFuture);
+            responseWaiting.start();
+            rpcResultFuture.addCallback(resetKeepaliveTask, MoreExecutors.directExecutor());
 
-            final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture);
+            final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture, responseWaiting);
             executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
 
             return rpcResultFuture;
diff --git a/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacadeResponseWaitingTest.java b/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacadeResponseWaitingTest.java
new file mode 100644 (file)
index 0000000..ea54749
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2019 Lumina Networks, Inc. All Rights Reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.connect.netconf.sal;
+
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
+
+import com.google.common.util.concurrent.SettableFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class KeepaliveSalFacadeResponseWaitingTest {
+
+    private static final RemoteDeviceId REMOTE_DEVICE_ID =
+            new RemoteDeviceId("test", new InetSocketAddress("localhost", 22));
+    private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_NODEID,
+            getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
+
+    private KeepaliveSalFacade keepaliveSalFacade;
+    private ScheduledExecutorService executorService;
+
+    private LocalNetconfSalFacade underlyingSalFacade;
+
+    @Mock
+    private DOMRpcService deviceRpc;
+
+    @Mock
+    private DOMMountPointService mountPointService;
+
+    @Mock
+    private DataBroker dataBroker;
+
+    @Mock
+    private NetconfDeviceCommunicator listener;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        executorService = Executors.newScheduledThreadPool(2);
+
+        underlyingSalFacade = new LocalNetconfSalFacade();
+        doNothing().when(listener).disconnect();
+        keepaliveSalFacade = new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 2L, 10000L);
+        keepaliveSalFacade.setListener(listener);
+    }
+
+    @After
+    public void tearDown() {
+        executorService.shutdown();
+    }
+
+    /**
+     * Not sending keepalive rpc test while the repsonse is processing.
+     */
+    @Test
+    public void testKeepaliveSalResponseWaiting() {
+        //This settable future object will be never set to any value. The test wants to simulate waiting for the result
+        //of the future object.
+        final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
+        when(deviceRpc.invokeRpc(null, null)).thenReturn(settableFuture);
+
+        //This settable future will be used to check the invokation of keepalive RPC. Should be never invoked.
+        final SettableFuture<DOMRpcResult> keepaliveSettableFuture = SettableFuture.create();
+        when(deviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD)).thenReturn(keepaliveSettableFuture);
+        final DOMRpcResult keepaliveResult = new DefaultDOMRpcResult(Builders.containerBuilder().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME)).build());
+        keepaliveSettableFuture.set(keepaliveResult);
+
+        keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
+
+        //Invoke general RPC on simulated local facade without args (or with null args). Will be returned
+        //settableFuture variable without any set value. WaitingShaduler in keepalive sal facade should wait for any
+        //result from the RPC and reset keepalive scheduler.
+        underlyingSalFacade.invokeNullRpc();
+
+        //Invoking of general RPC.
+        verify(deviceRpc, after(2000).times(1)).invokeRpc(null, null);
+
+        //verify the keepalive RPC invoke. Should be never happen.
+        verify(deviceRpc, after(2000).never()).invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD);
+    }
+
+    private final class LocalNetconfSalFacade implements RemoteDeviceHandler<NetconfSessionPreferences> {
+
+        private DOMRpcService localDeviceRpc;
+
+        @Override
+        public void onDeviceConnected(final SchemaContext remoteSchemaContext,
+                final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService currentDeviceRpc,
+                final DOMActionService deviceAction) {
+            localDeviceRpc = currentDeviceRpc;
+        }
+
+        @Override
+        public void onDeviceDisconnected() {
+        }
+
+        @Override
+        public void onDeviceFailed(final Throwable throwable) {
+        }
+
+        @Override
+        public void onNotification(final DOMNotification domNotification) {
+        }
+
+        @Override
+        public void close() {
+        }
+
+        public void invokeNullRpc() {
+            if (localDeviceRpc != null) {
+                localDeviceRpc.invokeRpc(null, null);
+            }
+        }
+    }
+}
+