X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Fsal%2FKeepaliveSalFacade.java;h=9aca46e7b184893ecddfc182e6d7d78f6eccf5b3;hb=1dba5b2651d7fc60af0263cc0640d5ebeda8e454;hp=73b1296ef8bcd1ae122d68d5909f6568ab67ca7c;hpb=1e174be6002940e17aee31d15f8914273d30f25c;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java index 73b1296ef8..9aca46e7b1 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java @@ -7,38 +7,38 @@ */ package org.opendaylight.netconf.sal.connect.netconf.sal; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.dom.api.DOMNotification; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import javax.xml.transform.dom.DOMSource; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceSchema; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +47,7 @@ import org.slf4j.LoggerFactory; * and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present). * The keepalive RPC is a get-config with empty filter. */ -public final class KeepaliveSalFacade implements RemoteDeviceHandler { - +public final class KeepaliveSalFacade implements RemoteDeviceHandler { private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class); // 2 minutes keepalive delay by default @@ -57,31 +56,31 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler salFacade; + private final RemoteDeviceHandler salFacade; private final ScheduledExecutorService executor; + private final long keepaliveDelaySeconds; - private final ResetKeepalive resetKeepaliveTask; - private final long defaultRequestTimeoutMillis; + private final long timeoutNanos; + private final long delayNanos; + + private final RemoteDeviceId id; private volatile NetconfDeviceCommunicator listener; - private volatile ScheduledFuture currentKeepalive; - private volatile DOMRpcService currentDeviceRpc; - private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false); + private volatile KeepaliveTask task; - public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor, final long keepaliveDelaySeconds, - final long defaultRequestTimeoutMillis) { + public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ScheduledExecutorService executor, final long keepaliveDelaySeconds, + final long requestTimeoutMillis) { this.id = id; this.salFacade = salFacade; - this.executor = executor; + this.executor = requireNonNull(executor); this.keepaliveDelaySeconds = keepaliveDelaySeconds; - this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis; - this.resetKeepaliveTask = new ResetKeepalive(); + delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds); + timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis); } - public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor) { + public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ScheduledExecutorService executor) { this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI); } @@ -95,55 +94,63 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler - * Then schedule next keepalive. + * Cancel current keepalive and free it. */ - void resetKeepalive() { - LOG.trace("{}: Resetting netconf keepalive timer", id); - if (currentKeepalive != null) { - currentKeepalive.cancel(false); + private synchronized void stopKeepalives() { + final var localTask = task; + if (localTask != null) { + localTask.disableKeepalive(); + task = null; } - scheduleKeepalives(); } - /** - * Cancel current keepalive and also reset current deviceRpc. - */ - private void stopKeepalives() { - if (currentKeepalive != null) { - currentKeepalive.cancel(false); + private void disableKeepalive() { + final var localTask = task; + if (localTask != null) { + localTask.disableKeepalive(); + } + } + + private void enableKeepalive() { + final var localTask = task; + if (localTask != null) { + localTask.enableKeepalive(); } - currentDeviceRpc = null; } void reconnect() { - Preconditions.checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id); + checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id); stopKeepalives(); LOG.info("{}: Reconnecting inactive netconf session", id); listener.disconnect(); } @Override - public void onDeviceConnected(final SchemaContext remoteSchemaContext, - final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) { - this.currentDeviceRpc = deviceRpc; - final DOMRpcService deviceRpc1 = - new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor); - salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1); - - LOG.debug("{}: Netconf session initiated, starting keepalives", id); - scheduleKeepalives(); - } + public void onDeviceConnected(final NetconfDeviceSchema deviceSchema, + final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) { + final var devRpc = services.rpcs(); + task = new KeepaliveTask(devRpc); + + final Rpcs keepaliveRpcs; + if (devRpc instanceof Rpcs.Normalized normalized) { + keepaliveRpcs = new NormalizedKeepaliveRpcs(normalized); + } else if (devRpc instanceof Rpcs.Schemaless schemaless) { + keepaliveRpcs = new SchemalessKeepaliveRpcs(schemaless); + } else { + throw new IllegalStateException("Unhandled " + devRpc); + } + + salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs, + // FIXME: wrap with keepalive + services.actions())); - private void scheduleKeepalives() { - lastKeepAliveSucceeded.set(true); - Preconditions.checkState(currentDeviceRpc != null); - LOG.trace("{}: Scheduling keepalives every {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS); - currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(), - keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS); + // We have performed a callback, which might have termined keepalives + final var localTask = task; + if (localTask != null) { + LOG.debug("{}: Netconf session initiated, starting keepalives", id); + LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds); + localTask.enableKeepalive(); + } } @Override @@ -160,7 +167,10 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler @NonNull ListenableFuture scheduleTimeout(final ListenableFuture invokeFuture) { + final var timeout = new RequestTimeoutTask<>(invokeFuture); + final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS); + invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor()); + return timeout.userFuture; + } /** * Invoke keepalive RPC and check the response. In case of any received response the keepalive @@ -181,138 +193,198 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { + private final class KeepaliveTask implements Runnable, FutureCallback { + // Keepalive RPC static resources + static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap( + NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER); + + private final Rpcs devRpc; + + @GuardedBy("this") + private boolean suppressed = false; + + private volatile long lastActivity; + + KeepaliveTask(final Rpcs devRpc) { + this.devRpc = requireNonNull(devRpc); + } @Override public void run() { - LOG.trace("{}: Invoking keepalive RPC", id); + final long local = lastActivity; + final long now = System.nanoTime(); + final long inFutureNanos = local + delayNanos - now; + if (inFutureNanos > 0) { + reschedule(inFutureNanos); + } else { + sendKeepalive(now); + } + } - try { - boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false); - if (!lastJobSucceeded) { - onFailure(new IllegalStateException("Previous keepalive timed out")); - } else { - Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this, - MoreExecutors.directExecutor()); - } - } catch (NullPointerException e) { - LOG.debug("{}: Skipping keepalive while reconnecting", id); - // Empty catch block intentional - // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and - // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled - // after reconnect so no action necessary here. + void recordActivity() { + lastActivity = System.nanoTime(); + } + + synchronized void disableKeepalive() { + // unsuppressed -> suppressed + suppressed = true; + } + + synchronized void enableKeepalive() { + recordActivity(); + if (!suppressed) { + // unscheduled -> unsuppressed + reschedule(); + } else { + // suppressed -> unsuppressed + suppressed = false; + } + } + + private synchronized void sendKeepalive(final long now) { + if (suppressed) { + LOG.debug("{}: Skipping keepalive while disabled", id); + // suppressed -> unscheduled + suppressed = false; + return; } + + LOG.trace("{}: Invoking keepalive RPC", id); + final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD); + lastActivity = now; + Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor()); } + // FIXME: re-examine this suppression @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE", justification = "Unrecognised NullableDecl") @Override public void onSuccess(final DOMRpcResult result) { // No matter what response we got, rpc-reply or rpc-error, // we got it from device so the netconf session is OK - if (result != null && result.getResult() != null) { - lastKeepAliveSucceeded.set(true); - } else if (result != null && result.getErrors() != null) { - LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors()); - lastKeepAliveSucceeded.set(true); - } else { + if (result == null) { LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id); reconnect(); + return; + } + + if (result.value() != null) { + reschedule(); + } else { + final var errors = result.errors(); + if (!errors.isEmpty()) { + LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors); + reschedule(); + } else { + LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id); + reconnect(); + } } } @Override - public void onFailure(@Nonnull final Throwable throwable) { + public void onFailure(final Throwable throwable) { LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable); reconnect(); } + + private void reschedule() { + reschedule(delayNanos); + } + + private void reschedule(final long delay) { + executor.schedule(this, delay, TimeUnit.NANOSECONDS); + } } - /** - * Reset keepalive after each RPC response received. + /* + * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not + * yet finished, we cancel it. */ - private class ResetKeepalive implements FutureCallback { + private final class RequestTimeoutTask implements FutureCallback, Runnable { + private final @NonNull SettableFuture userFuture = SettableFuture.create(); + private final @NonNull ListenableFuture rpcResultFuture; + + RequestTimeoutTask(final ListenableFuture rpcResultFuture) { + this.rpcResultFuture = requireNonNull(rpcResultFuture); + Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor()); + } + @Override - public void onSuccess(@Nullable final DOMRpcResult result) { + public void run() { + rpcResultFuture.cancel(true); + userFuture.cancel(false); + enableKeepalive(); + } + + @Override + public void onSuccess(final V result) { // No matter what response we got, // rpc-reply or rpc-error, we got it from device so the netconf session is OK. - resetKeepalive(); + userFuture.set(result); + enableKeepalive(); } @Override - public void onFailure(@Nonnull final Throwable throwable) { - // User/Application RPC failed (The RPC did not reach the remote device or .. - // TODO what other reasons could cause this ?) - // There is no point in keeping this session. Reconnect. + public void onFailure(final Throwable throwable) { + // User/Application RPC failed (The RPC did not reach the remote device or ...) + // FIXME: what other reasons could cause this ?) LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable); + userFuture.setException(throwable); + // There is no point in keeping this session. Reconnect. reconnect(); } } - /* - * Request timeout task is called once the defaultRequestTimeoutMillis is - * reached. At this moment, if the request is not yet finished, we cancel - * it. + /** + * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC + * invocation. Version for {@link Rpcs.Normalized}. */ - private static final class RequestTimeoutTask implements Runnable { + private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized { + private final Rpcs.Normalized delegate; - private final CheckedFuture rpcResultFuture; + NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) { + this.delegate = requireNonNull(delegate); + } - RequestTimeoutTask(final CheckedFuture rpcResultFuture) { - this.rpcResultFuture = rpcResultFuture; + @Override + public ListenableFuture invokeRpc(final QName type, final ContainerNode input) { + // FIXME: what happens if we disable keepalive and then invokeRpc() throws? + disableKeepalive(); + return scheduleTimeout(delegate.invokeRpc(type, input)); } @Override - public void run() { - if (!rpcResultFuture.isDone()) { - rpcResultFuture.cancel(true); - } + public ListenerRegistration registerRpcListener( + final T rpcListener) { + // There is no real communication with the device (yet), hence no recordActivity() or anything + return delegate.registerRpcListener(rpcListener); } } /** - * DOMRpcService proxy that attaches reset-keepalive-task and schedule - * request-timeout-task to each RPC invocation. + * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC + * invocation. Version for {@link Rpcs.Schemaless}. */ - public static final class KeepaliveDOMRpcService implements DOMRpcService { - - private final DOMRpcService deviceRpc; - private final ResetKeepalive resetKeepaliveTask; - private final long defaultRequestTimeoutMillis; - private final ScheduledExecutorService executor; - - KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask, - final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor) { - this.deviceRpc = deviceRpc; - this.resetKeepaliveTask = resetKeepaliveTask; - this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis; - this.executor = executor; - } + private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless { + private final Rpcs.Schemaless delegate; - public DOMRpcService getDeviceRpc() { - return deviceRpc; + SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) { + this.delegate = requireNonNull(delegate); } - @Nonnull @Override - public CheckedFuture invokeRpc(@Nonnull final SchemaPath type, - final NormalizedNode input) { - final CheckedFuture domRpcResultDOMRpcExceptionCheckedFuture = - deviceRpc.invokeRpc(type, input); - Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask, - MoreExecutors.directExecutor()); - - final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture); - executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS); - - return domRpcResultDOMRpcExceptionCheckedFuture; + public ListenableFuture invokeNetconf(final QName type, final ContainerNode input) { + // FIXME: what happens if we disable keepalive and then invokeRpc() throws? + disableKeepalive(); + return scheduleTimeout(delegate.invokeNetconf(type, input)); } @Override - public ListenerRegistration registerRpcListener( - @Nonnull final T listener) { - // There is no real communication with the device (yet), no reset here - return deviceRpc.registerRpcListener(listener); + public ListenableFuture invokeRpc(final QName type, final DOMSource input) { + // FIXME: what happens if we disable keepalive and then invokeRpc() throws? + disableKeepalive(); + return scheduleTimeout(delegate.invokeRpc(type, input)); } } }