*/
package org.opendaylight.netconf.sal.connect.netconf.sal;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import javax.xml.transform.dom.DOMSource;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceSchema;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present).
* The keepalive RPC is a get-config with empty filter.
*/
-public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSessionPreferences> {
-
+public final class KeepaliveSalFacade implements RemoteDeviceHandler {
private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class);
// 2 minutes keepalive delay by default
// 1 minute transaction timeout by default
private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
- private final RemoteDeviceId id;
- private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
+ private final RemoteDeviceHandler salFacade;
private final ScheduledExecutorService executor;
+
private final long keepaliveDelaySeconds;
- private final ResetKeepalive resetKeepaliveTask;
- private final long defaultRequestTimeoutMillis;
+ private final long timeoutNanos;
+ private final long delayNanos;
+
+ private final RemoteDeviceId id;
private volatile NetconfDeviceCommunicator listener;
- private volatile ScheduledFuture<?> currentKeepalive;
- private volatile DOMRpcService currentDeviceRpc;
+ private volatile KeepaliveTask task;
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
- final long defaultRequestTimeoutMillis) {
+ public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
+ final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
+ final long requestTimeoutMillis) {
this.id = id;
this.salFacade = salFacade;
- this.executor = executor;
+ this.executor = requireNonNull(executor);
this.keepaliveDelaySeconds = keepaliveDelaySeconds;
- this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
- this.resetKeepaliveTask = new ResetKeepalive();
+ delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
+ timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
}
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor) {
+ public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
+ final ScheduledExecutorService executor) {
this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
}
}
/**
- * Just cancel current keepalive task.
- * If its already started, let it finish ... not such a big deal.
- *
- * <p>
- * Then schedule next keepalive.
+ * Cancel current keepalive and free it.
*/
- void resetKeepalive() {
- LOG.trace("{}: Resetting netconf keepalive timer", id);
- if (currentKeepalive != null) {
- currentKeepalive.cancel(false);
+ private synchronized void stopKeepalives() {
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.disableKeepalive();
+ task = null;
}
- scheduleKeepalive();
}
- /**
- * Cancel current keepalive and also reset current deviceRpc.
- */
- private void stopKeepalives() {
- if (currentKeepalive != null) {
- currentKeepalive.cancel(false);
+ private void disableKeepalive() {
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.disableKeepalive();
+ }
+ }
+
+ private void enableKeepalive() {
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.enableKeepalive();
}
- currentDeviceRpc = null;
}
void reconnect() {
- Preconditions.checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
+ checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
stopKeepalives();
LOG.info("{}: Reconnecting inactive netconf session", id);
listener.disconnect();
}
@Override
- public void onDeviceConnected(final SchemaContext remoteSchemaContext,
- final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
- this.currentDeviceRpc = deviceRpc;
- final DOMRpcService deviceRpc1 =
- new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor);
- salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1);
-
- LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- scheduleKeepalive();
- }
+ public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
+ final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
+ final var devRpc = services.rpcs();
+ task = new KeepaliveTask(devRpc);
+
+ final Rpcs keepaliveRpcs;
+ if (devRpc instanceof Rpcs.Normalized normalized) {
+ keepaliveRpcs = new NormalizedKeepaliveRpcs(normalized);
+ } else if (devRpc instanceof Rpcs.Schemaless schemaless) {
+ keepaliveRpcs = new SchemalessKeepaliveRpcs(schemaless);
+ } else {
+ throw new IllegalStateException("Unhandled " + devRpc);
+ }
- private void scheduleKeepalive() {
- Preconditions.checkState(currentDeviceRpc != null);
- LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
- currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS);
+ salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
+ // FIXME: wrap with keepalive
+ services.actions()));
+
+ // We have performed a callback, which might have termined keepalives
+ final var localTask = task;
+ if (localTask != null) {
+ LOG.debug("{}: Netconf session initiated, starting keepalives", id);
+ LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+ localTask.enableKeepalive();
+ }
}
@Override
@Override
public void onNotification(final DOMNotification domNotification) {
- resetKeepalive();
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.recordActivity();
+ }
salFacade.onNotification(domNotification);
}
salFacade.close();
}
- // Keepalive RPC static resources
- private static final SchemaPath PATH = toPath(NETCONF_GET_CONFIG_QNAME);
- private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME,
- getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
+ private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
+ final var timeout = new RequestTimeoutTask<>(invokeFuture);
+ final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
+ invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+ return timeout.userFuture;
+ }
/**
* Invoke keepalive RPC and check the response. In case of any received response the keepalive
* response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
* is considered inactive/failed.
*/
- private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
+ private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
+ // Keepalive RPC static resources
+ static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
+ NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
- private final ScheduledFuture<?> previousKeepalive;
+ private final Rpcs devRpc;
- Keepalive(final ScheduledFuture<?> previousKeepalive) {
- this.previousKeepalive = previousKeepalive;
+ @GuardedBy("this")
+ private boolean suppressed = false;
+
+ private volatile long lastActivity;
+
+ KeepaliveTask(final Rpcs devRpc) {
+ this.devRpc = requireNonNull(devRpc);
}
@Override
public void run() {
- LOG.trace("{}: Invoking keepalive RPC", id);
+ final long local = lastActivity;
+ final long now = System.nanoTime();
+ final long inFutureNanos = local + delayNanos - now;
+ if (inFutureNanos > 0) {
+ reschedule(inFutureNanos);
+ } else {
+ sendKeepalive(now);
+ }
+ }
- try {
- if (previousKeepalive != null && !previousKeepalive.isDone()) {
- onFailure(new IllegalStateException("Previous keepalive timed out"));
- } else {
- Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this);
- }
- } catch (NullPointerException e) {
- LOG.debug("{}: Skipping keepalive while reconnecting", id);
- // Empty catch block intentional
- // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and
- // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled
- // after reconnect so no action necessary here.
+ void recordActivity() {
+ lastActivity = System.nanoTime();
+ }
+
+ synchronized void disableKeepalive() {
+ // unsuppressed -> suppressed
+ suppressed = true;
+ }
+
+ synchronized void enableKeepalive() {
+ recordActivity();
+ if (!suppressed) {
+ // unscheduled -> unsuppressed
+ reschedule();
+ } else {
+ // suppressed -> unsuppressed
+ suppressed = false;
}
}
+ private synchronized void sendKeepalive(final long now) {
+ if (suppressed) {
+ LOG.debug("{}: Skipping keepalive while disabled", id);
+ // suppressed -> unscheduled
+ suppressed = false;
+ return;
+ }
+
+ LOG.trace("{}: Invoking keepalive RPC", id);
+ final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
+ lastActivity = now;
+ Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
+ }
+
+ // FIXME: re-examine this suppression
+ @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
+ justification = "Unrecognised NullableDecl")
@Override
public void onSuccess(final DOMRpcResult result) {
// No matter what response we got, rpc-reply or rpc-error,
// we got it from device so the netconf session is OK
- if (result != null && result.getResult() != null) {
- LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult());
- scheduleKeepalive();
- } else if (result != null && result.getErrors() != null) {
- LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors());
- scheduleKeepalive();
- } else {
- LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session", id, result);
+ if (result == null) {
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
reconnect();
+ return;
+ }
+
+ if (result.value() != null) {
+ reschedule();
+ } else {
+ final var errors = result.errors();
+ if (!errors.isEmpty()) {
+ LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
+ reschedule();
+ } else {
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
+ reconnect();
+ }
}
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
reconnect();
}
+
+ private void reschedule() {
+ reschedule(delayNanos);
+ }
+
+ private void reschedule(final long delay) {
+ executor.schedule(this, delay, TimeUnit.NANOSECONDS);
+ }
}
- /**
- * Reset keepalive after each RPC response received.
+ /*
+ * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
+ * yet finished, we cancel it.
*/
- private class ResetKeepalive implements FutureCallback<DOMRpcResult> {
+ private final class RequestTimeoutTask<V> implements FutureCallback<V>, Runnable {
+ private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
+ private final @NonNull ListenableFuture<? extends V> rpcResultFuture;
+
+ RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
+ this.rpcResultFuture = requireNonNull(rpcResultFuture);
+ Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
+ }
+
@Override
- public void onSuccess(@Nullable final DOMRpcResult result) {
+ public void run() {
+ rpcResultFuture.cancel(true);
+ userFuture.cancel(false);
+ enableKeepalive();
+ }
+
+ @Override
+ public void onSuccess(final V result) {
// No matter what response we got,
// rpc-reply or rpc-error, we got it from device so the netconf session is OK.
- resetKeepalive();
+ userFuture.set(result);
+ enableKeepalive();
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
- // User/Application RPC failed (The RPC did not reach the remote device or ..
- // TODO what other reasons could cause this ?)
- // There is no point in keeping this session. Reconnect.
+ public void onFailure(final Throwable throwable) {
+ // User/Application RPC failed (The RPC did not reach the remote device or ...)
+ // FIXME: what other reasons could cause this ?)
LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
+ userFuture.setException(throwable);
+ // There is no point in keeping this session. Reconnect.
reconnect();
}
}
- /*
- * Request timeout task is called once the defaultRequestTimeoutMillis is
- * reached. At this moment, if the request is not yet finished, we cancel
- * it.
+ /**
+ * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
+ * invocation. Version for {@link Rpcs.Normalized}.
*/
- private static final class RequestTimeoutTask implements Runnable {
+ private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized {
+ private final Rpcs.Normalized delegate;
- private final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultFuture;
+ NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) {
+ this.delegate = requireNonNull(delegate);
+ }
- RequestTimeoutTask(final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultFuture) {
- this.rpcResultFuture = rpcResultFuture;
+ @Override
+ public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
+ // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
+ disableKeepalive();
+ return scheduleTimeout(delegate.invokeRpc(type, input));
}
@Override
- public void run() {
- if (!rpcResultFuture.isDone()) {
- rpcResultFuture.cancel(true);
- }
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+ final T rpcListener) {
+ // There is no real communication with the device (yet), hence no recordActivity() or anything
+ return delegate.registerRpcListener(rpcListener);
}
}
/**
- * DOMRpcService proxy that attaches reset-keepalive-task and schedule
- * request-timeout-task to each RPC invocation.
+ * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
+ * invocation. Version for {@link Rpcs.Schemaless}.
*/
- public static final class KeepaliveDOMRpcService implements DOMRpcService {
-
- private final DOMRpcService deviceRpc;
- private ResetKeepalive resetKeepaliveTask;
- private final long defaultRequestTimeoutMillis;
- private final ScheduledExecutorService executor;
-
- KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask,
- final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor) {
- this.deviceRpc = deviceRpc;
- this.resetKeepaliveTask = resetKeepaliveTask;
- this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
- this.executor = executor;
- }
+ private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless {
+ private final Rpcs.Schemaless delegate;
- public DOMRpcService getDeviceRpc() {
- return deviceRpc;
+ SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) {
+ this.delegate = requireNonNull(delegate);
}
- @Nonnull
@Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type,
- final NormalizedNode<?, ?> input) {
- final CheckedFuture<DOMRpcResult, DOMRpcException> domRpcResultDOMRpcExceptionCheckedFuture =
- deviceRpc.invokeRpc(type, input);
- Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask);
-
- final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture);
- executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
-
- return domRpcResultDOMRpcExceptionCheckedFuture;
+ public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
+ // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
+ disableKeepalive();
+ return scheduleTimeout(delegate.invokeNetconf(type, input));
}
@Override
- public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
- @Nonnull final T listener) {
- // There is no real communication with the device (yet), no reset here
- return deviceRpc.registerRpcListener(listener);
+ public ListenableFuture<? extends DOMSource> invokeRpc(final QName type, final DOMSource input) {
+ // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
+ disableKeepalive();
+ return scheduleTimeout(delegate.invokeRpc(type, input));
}
}
}