X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Fsal%2FKeepaliveSalFacade.java;h=b853db62eed8471c31c92bd4df14b2a4ce926d57;hb=68d52ef8ea8ee2ee6cda8bbc4a864c3fa897297e;hp=dc2b5dff427f3b40c12ebd5624b1a33151cabff3;hpb=a5e97b86ef5c93bd506751c868ea12db24eab0b0;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java index dc2b5dff42..b853db62ee 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java @@ -7,35 +7,38 @@ */ package org.opendaylight.netconf.sal.connect.netconf.sal; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.dom.api.DOMNotification; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import javax.xml.transform.dom.DOMSource; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceSchema; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,39 +47,45 @@ import org.slf4j.LoggerFactory; * and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present). * The keepalive RPC is a get-config with empty filter. */ -public final class KeepaliveSalFacade implements RemoteDeviceHandler { - +public final class KeepaliveSalFacade implements RemoteDeviceHandler { private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class); // 2 minutes keepalive delay by default private static final long DEFAULT_DELAY = TimeUnit.MINUTES.toSeconds(2); - private final RemoteDeviceId id; - private final RemoteDeviceHandler salFacade; + // 1 minute transaction timeout by default + private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000); + + private final RemoteDeviceHandler salFacade; private final ScheduledExecutorService executor; + private final long keepaliveDelaySeconds; - private final ResetKeepalive resetKeepaliveTask; + private final long timeoutNanos; + private final long delayNanos; + + private final RemoteDeviceId id; private volatile NetconfDeviceCommunicator listener; - private volatile ScheduledFuture currentKeepalive; - private volatile DOMRpcService currentDeviceRpc; + private volatile KeepaliveTask task; - public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor, final long keepaliveDelaySeconds) { + public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ScheduledExecutorService executor, final long keepaliveDelaySeconds, + final long requestTimeoutMillis) { this.id = id; this.salFacade = salFacade; - this.executor = executor; + this.executor = requireNonNull(executor); this.keepaliveDelaySeconds = keepaliveDelaySeconds; - this.resetKeepaliveTask = new ResetKeepalive(); + delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds); + timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis); } - public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor) { - this(id, salFacade, executor, DEFAULT_DELAY); + public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ScheduledExecutorService executor) { + this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI); } /** - * Set the netconf session listener whenever ready + * Set the netconf session listener whenever ready. * * @param listener netconf session listener */ @@ -85,50 +94,63 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler @NonNull ListenableFuture scheduleTimeout(final ListenableFuture invokeFuture) { + final var timeout = new RequestTimeoutTask<>(invokeFuture); + final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS); + invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor()); + return timeout.userFuture; + } /** * Invoke keepalive RPC and check the response. In case of any received response the keepalive @@ -166,95 +193,198 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { + private final class KeepaliveTask implements Runnable, FutureCallback { + // Keepalive RPC static resources + static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap( + NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER); + + private final Rpcs devRpc; - private final ScheduledFuture previousKeepalive; + @GuardedBy("this") + private boolean suppressed = false; - public Keepalive(final ScheduledFuture previousKeepalive) { - this.previousKeepalive = previousKeepalive; + private volatile long lastActivity; + + KeepaliveTask(final Rpcs devRpc) { + this.devRpc = requireNonNull(devRpc); } @Override public void run() { - LOG.trace("{}: Invoking keepalive RPC", id); + final long local = lastActivity; + final long now = System.nanoTime(); + final long inFutureNanos = local + delayNanos - now; + if (inFutureNanos > 0) { + reschedule(inFutureNanos); + } else { + sendKeepalive(now); + } + } - try { - if(previousKeepalive != null && !previousKeepalive.isDone()) { - onFailure(new IllegalStateException("Previous keepalive timed out")); - } else { - Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this); - } - } catch (NullPointerException e) { - LOG.debug("{}: Skipping keepalive while reconnecting", id); - // Empty catch block intentional - // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and - // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled - // after reconnect so no action necessary here. + void recordActivity() { + lastActivity = System.nanoTime(); + } + + synchronized void disableKeepalive() { + // unsuppressed -> suppressed + suppressed = true; + } + + synchronized void enableKeepalive() { + recordActivity(); + if (!suppressed) { + // unscheduled -> unsuppressed + reschedule(); + } else { + // suppressed -> unsuppressed + suppressed = false; + } + } + + private synchronized void sendKeepalive(final long now) { + if (suppressed) { + LOG.debug("{}: Skipping keepalive while disabled", id); + // suppressed -> unscheduled + suppressed = false; + return; } + + LOG.trace("{}: Invoking keepalive RPC", id); + final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD); + lastActivity = now; + Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor()); } + // FIXME: re-examine this suppression + @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE", + justification = "Unrecognised NullableDecl") @Override public void onSuccess(final DOMRpcResult result) { - if (result != null && result.getResult() != null) { - LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult()); - scheduleKeepalive(); - } else { - LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session", id, result); + // No matter what response we got, rpc-reply or rpc-error, + // we got it from device so the netconf session is OK + if (result == null) { + LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id); reconnect(); + return; + } + + if (result.value() != null) { + reschedule(); + } else { + final var errors = result.errors(); + if (!errors.isEmpty()) { + LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors); + reschedule(); + } else { + LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id); + reconnect(); + } } } @Override - public void onFailure(@Nonnull final Throwable t) { - LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, t); + public void onFailure(final Throwable throwable) { + LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable); reconnect(); } + + private void reschedule() { + reschedule(delayNanos); + } + + private void reschedule(final long delay) { + executor.schedule(this, delay, TimeUnit.NANOSECONDS); + } } - /** - * Reset keepalive after each RPC response received + /* + * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not + * yet finished, we cancel it. */ - private class ResetKeepalive implements com.google.common.util.concurrent.FutureCallback { + private final class RequestTimeoutTask implements FutureCallback, Runnable { + private final @NonNull SettableFuture userFuture = SettableFuture.create(); + private final @NonNull ListenableFuture rpcResultFuture; + + RequestTimeoutTask(final ListenableFuture rpcResultFuture) { + this.rpcResultFuture = requireNonNull(rpcResultFuture); + Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor()); + } + + @Override + public void run() { + rpcResultFuture.cancel(true); + userFuture.cancel(false); + enableKeepalive(); + } + @Override - public void onSuccess(@Nullable final DOMRpcResult result) { - // No matter what response we got, rpc-reply or rpc-error, we got it from device so the netconf session is OK - resetKeepalive(); + public void onSuccess(final V result) { + // No matter what response we got, + // rpc-reply or rpc-error, we got it from device so the netconf session is OK. + userFuture.set(result); + enableKeepalive(); } @Override - public void onFailure(@Nonnull final Throwable t) { - // User/Application RPC failed (The RPC did not reach the remote device. + public void onFailure(final Throwable throwable) { + // User/Application RPC failed (The RPC did not reach the remote device or ...) + // FIXME: what other reasons could cause this ?) + LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable); + userFuture.setException(throwable); // There is no point in keeping this session. Reconnect. - LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, t); reconnect(); } } /** - * DOMRpcService proxy that attaches reset-keepalive-task to each RPC invocation. + * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC + * invocation. Version for {@link Rpcs.Normalized}. */ - private static final class KeepaliveDOMRpcService implements DOMRpcService { + private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized { + private final Rpcs.Normalized delegate; + + NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) { + this.delegate = requireNonNull(delegate); + } + + @Override + public ListenableFuture invokeRpc(final QName type, final ContainerNode input) { + // FIXME: what happens if we disable keepalive and then invokeRpc() throws? + disableKeepalive(); + return scheduleTimeout(delegate.invokeRpc(type, input)); + } + + @Override + public ListenerRegistration registerRpcListener( + final T rpcListener) { + // There is no real communication with the device (yet), hence no recordActivity() or anything + return delegate.registerRpcListener(rpcListener); + } + } - private final DOMRpcService deviceRpc; - private ResetKeepalive resetKeepaliveTask; + /** + * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC + * invocation. Version for {@link Rpcs.Schemaless}. + */ + private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless { + private final Rpcs.Schemaless delegate; - public KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask) { - this.deviceRpc = deviceRpc; - this.resetKeepaliveTask = resetKeepaliveTask; + SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) { + this.delegate = requireNonNull(delegate); } - @Nonnull @Override - public CheckedFuture invokeRpc(@Nonnull final SchemaPath type, final NormalizedNode input) { - final CheckedFuture domRpcResultDOMRpcExceptionCheckedFuture = deviceRpc.invokeRpc(type, input); - Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask); - return domRpcResultDOMRpcExceptionCheckedFuture; + public ListenableFuture invokeNetconf(final QName type, final ContainerNode input) { + // FIXME: what happens if we disable keepalive and then invokeRpc() throws? + disableKeepalive(); + return scheduleTimeout(delegate.invokeNetconf(type, input)); } @Override - public ListenerRegistration registerRpcListener(@Nonnull final T listener) { - // There is no real communication with the device (yet), no reset here - return deviceRpc.registerRpcListener(listener); + public ListenableFuture invokeRpc(final QName type, final DOMSource input) { + // FIXME: what happens if we disable keepalive and then invokeRpc() throws? + disableKeepalive(); + return scheduleTimeout(delegate.invokeRpc(type, input)); } } }