final DOMActionService deviceAction) {
this.currentDeviceRpc = deviceRpc;
final DOMRpcService deviceRpc1 =
- new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor);
+ new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor,
+ new ResponseWaitingScheduler());
salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1, deviceAction);
LOG.trace("{}: Invoking keepalive RPC", id);
try {
- boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
+ final boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
if (!lastJobSucceeded) {
onFailure(new IllegalStateException("Previous keepalive timed out"));
} else {
Futures.addCallback(currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD), this,
MoreExecutors.directExecutor());
}
- } catch (NullPointerException e) {
+ } catch (final NullPointerException e) {
LOG.debug("{}: Skipping keepalive while reconnecting", id);
// Empty catch block intentional
// Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and
}
}
+ private final class ResponseWaitingScheduler {
+
+ private ScheduledFuture<?> schedule;
+
+ public void initScheduler(final Runnable runnable) {
+ if (currentKeepalive != null) {
+ currentKeepalive.cancel(true);
+ } else {
+ LOG.trace("Keepalive does not exist.");
+ }
+ scheduleKeepalives();
+ //Listening on the result should be done before the keepalive rpc will be send
+ final long delay = (keepaliveDelaySeconds * 1000) - 500;
+ schedule = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopScheduler() {
+ if (schedule != null) {
+ schedule.cancel(true);
+ } else {
+ LOG.trace("Scheduler does not exist.");
+ }
+ }
+ }
+
+ private static final class ResponseWaiting implements Runnable {
+
+ private final FluentFuture<DOMRpcResult> rpcResultFuture;
+ private final ResponseWaitingScheduler responseWaitingScheduler;
+
+ ResponseWaiting(final ResponseWaitingScheduler responseWaitingScheduler,
+ final FluentFuture<DOMRpcResult> rpcResultFuture) {
+ this.responseWaitingScheduler = responseWaitingScheduler;
+ this.rpcResultFuture = rpcResultFuture;
+ }
+
+ public void start() {
+ LOG.trace("Start to waiting for result.");
+ responseWaitingScheduler.initScheduler(this);
+ }
+
+ public void stop() {
+ LOG.info("Stop to waiting for result.");
+ responseWaitingScheduler.stopScheduler();
+ }
+
+ @Override
+ public void run() {
+ if (!rpcResultFuture.isCancelled() && !rpcResultFuture.isDone()) {
+ LOG.trace("Waiting for result");
+ responseWaitingScheduler.initScheduler(this);
+ } else {
+ LOG.trace("Result has been cancelled or done.");
+ }
+ }
+ }
+
/*
* Request timeout task is called once the defaultRequestTimeoutMillis is
* reached. At this moment, if the request is not yet finished, we cancel
private static final class RequestTimeoutTask implements Runnable {
private final FluentFuture<DOMRpcResult> rpcResultFuture;
+ private final ResponseWaiting responseWaiting;
- RequestTimeoutTask(final FluentFuture<DOMRpcResult> rpcResultFuture) {
+ RequestTimeoutTask(final FluentFuture<DOMRpcResult> rpcResultFuture, final ResponseWaiting responseWaiting) {
this.rpcResultFuture = rpcResultFuture;
+ this.responseWaiting = responseWaiting;
}
@Override
if (!rpcResultFuture.isDone()) {
rpcResultFuture.cancel(true);
}
+ if (responseWaiting != null) {
+ responseWaiting.stop();
+ }
}
}
private final ResetKeepalive resetKeepaliveTask;
private final long defaultRequestTimeoutMillis;
private final ScheduledExecutorService executor;
+ private final ResponseWaitingScheduler responseWaitingScheduler;
KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask,
- final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor) {
+ final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor,
+ final ResponseWaitingScheduler responseWaitingScheduler) {
this.deviceRpc = deviceRpc;
this.resetKeepaliveTask = resetKeepaliveTask;
this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
this.executor = executor;
+ this.responseWaitingScheduler = responseWaitingScheduler;
}
public DOMRpcService getDeviceRpc() {
public @NonNull FluentFuture<DOMRpcResult> invokeRpc(@Nonnull final SchemaPath type,
final NormalizedNode<?, ?> input) {
final FluentFuture<DOMRpcResult> rpcResultFuture = deviceRpc.invokeRpc(type, input);
- Futures.addCallback(rpcResultFuture, resetKeepaliveTask, MoreExecutors.directExecutor());
+ final ResponseWaiting responseWaiting = new ResponseWaiting(responseWaitingScheduler, rpcResultFuture);
+ responseWaiting.start();
+ rpcResultFuture.addCallback(resetKeepaliveTask, MoreExecutors.directExecutor());
- final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture);
+ final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture, responseWaiting);
executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
return rpcResultFuture;
--- /dev/null
+/*
+ * Copyright (c) 2019 Lumina Networks, Inc. All Rights Reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.connect.netconf.sal;
+
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
+
+import com.google.common.util.concurrent.SettableFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class KeepaliveSalFacadeResponseWaitingTest {
+
+ private static final RemoteDeviceId REMOTE_DEVICE_ID =
+ new RemoteDeviceId("test", new InetSocketAddress("localhost", 22));
+ private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_NODEID,
+ getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
+
+ private KeepaliveSalFacade keepaliveSalFacade;
+ private ScheduledExecutorService executorService;
+
+ private LocalNetconfSalFacade underlyingSalFacade;
+
+ @Mock
+ private DOMRpcService deviceRpc;
+
+ @Mock
+ private DOMMountPointService mountPointService;
+
+ @Mock
+ private DataBroker dataBroker;
+
+ @Mock
+ private NetconfDeviceCommunicator listener;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ executorService = Executors.newScheduledThreadPool(2);
+
+ underlyingSalFacade = new LocalNetconfSalFacade();
+ doNothing().when(listener).disconnect();
+ keepaliveSalFacade = new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 2L, 10000L);
+ keepaliveSalFacade.setListener(listener);
+ }
+
+ @After
+ public void tearDown() {
+ executorService.shutdown();
+ }
+
+ /**
+ * Not sending keepalive rpc test while the repsonse is processing.
+ */
+ @Test
+ public void testKeepaliveSalResponseWaiting() {
+ //This settable future object will be never set to any value. The test wants to simulate waiting for the result
+ //of the future object.
+ final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
+ when(deviceRpc.invokeRpc(null, null)).thenReturn(settableFuture);
+
+ //This settable future will be used to check the invokation of keepalive RPC. Should be never invoked.
+ final SettableFuture<DOMRpcResult> keepaliveSettableFuture = SettableFuture.create();
+ when(deviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD)).thenReturn(keepaliveSettableFuture);
+ final DOMRpcResult keepaliveResult = new DefaultDOMRpcResult(Builders.containerBuilder().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME)).build());
+ keepaliveSettableFuture.set(keepaliveResult);
+
+ keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
+
+ //Invoke general RPC on simulated local facade without args (or with null args). Will be returned
+ //settableFuture variable without any set value. WaitingShaduler in keepalive sal facade should wait for any
+ //result from the RPC and reset keepalive scheduler.
+ underlyingSalFacade.invokeNullRpc();
+
+ //Invoking of general RPC.
+ verify(deviceRpc, after(2000).times(1)).invokeRpc(null, null);
+
+ //verify the keepalive RPC invoke. Should be never happen.
+ verify(deviceRpc, after(2000).never()).invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD);
+ }
+
+ private final class LocalNetconfSalFacade implements RemoteDeviceHandler<NetconfSessionPreferences> {
+
+ private DOMRpcService localDeviceRpc;
+
+ @Override
+ public void onDeviceConnected(final SchemaContext remoteSchemaContext,
+ final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService currentDeviceRpc,
+ final DOMActionService deviceAction) {
+ localDeviceRpc = currentDeviceRpc;
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+ }
+
+ @Override
+ public void onDeviceFailed(final Throwable throwable) {
+ }
+
+ @Override
+ public void onNotification(final DOMNotification domNotification) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public void invokeNullRpc() {
+ if (localDeviceRpc != null) {
+ localDeviceRpc.invokeRpc(null, null);
+ }
+ }
+ }
+}
+