*/
package org.opendaylight.netconf.sal.connect.netconf.sal;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.dom.api.DOMActionService;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceSchema;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present).
* The keepalive RPC is a get-config with empty filter.
*/
-public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSessionPreferences> {
-
+public final class KeepaliveSalFacade implements RemoteDeviceHandler {
private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class);
// 2 minutes keepalive delay by default
// 1 minute transaction timeout by default
private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
- private final RemoteDeviceId id;
- private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
+ private final KeepaliveTask keepaliveTask = new KeepaliveTask();
+ private final RemoteDeviceHandler salFacade;
private final ScheduledExecutorService executor;
+
private final long keepaliveDelaySeconds;
- private final ResetKeepalive resetKeepaliveTask;
- private final long defaultRequestTimeoutMillis;
+ private final long timeoutNanos;
+ private final long delayNanos;
+
+ private final RemoteDeviceId id;
private volatile NetconfDeviceCommunicator listener;
- private volatile ScheduledFuture<?> currentKeepalive;
private volatile DOMRpcService currentDeviceRpc;
- private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false);
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
- final long defaultRequestTimeoutMillis) {
+ public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
+ final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
+ final long requestTimeoutMillis) {
this.id = id;
this.salFacade = salFacade;
- this.executor = executor;
+ this.executor = requireNonNull(executor);
this.keepaliveDelaySeconds = keepaliveDelaySeconds;
- this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
- this.resetKeepaliveTask = new ResetKeepalive();
+ delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
+ timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
}
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor) {
+ public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
+ final ScheduledExecutorService executor) {
this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
}
this.listener = listener;
}
- /**
- * Just cancel current keepalive task.
- * If its already started, let it finish ... not such a big deal.
- *
- * <p>
- * Then schedule next keepalive.
- */
- void resetKeepalive() {
- LOG.trace("{}: Resetting netconf keepalive timer", id);
- if (currentKeepalive != null) {
- currentKeepalive.cancel(false);
- }
- scheduleKeepalives();
- }
-
/**
* Cancel current keepalive and also reset current deviceRpc.
*/
- private void stopKeepalives() {
- if (currentKeepalive != null) {
- currentKeepalive.cancel(false);
- }
+ private synchronized void stopKeepalives() {
+ keepaliveTask.disableKeepalive();
currentDeviceRpc = null;
}
void reconnect() {
- Preconditions.checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
+ checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
stopKeepalives();
LOG.info("{}: Reconnecting inactive netconf session", id);
listener.disconnect();
}
@Override
- public void onDeviceConnected(final SchemaContext remoteSchemaContext,
- final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
- onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc, null);
+ public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
+ final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
+ onDeviceConnected(deviceSchema, netconfSessionPreferences, deviceRpc, null);
}
@Override
- public void onDeviceConnected(final SchemaContext remoteSchemaContext,
+ public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc,
final DOMActionService deviceAction) {
- this.currentDeviceRpc = deviceRpc;
- final DOMRpcService deviceRpc1 =
- new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor);
-
- salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1, deviceAction);
+ currentDeviceRpc = requireNonNull(deviceRpc);
+ salFacade.onDeviceConnected(deviceSchema, netconfSessionPreferences,
+ new KeepaliveDOMRpcService(deviceRpc), deviceAction);
LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- scheduleKeepalives();
- }
-
- private void scheduleKeepalives() {
- lastKeepAliveSucceeded.set(true);
- Preconditions.checkState(currentDeviceRpc != null);
- LOG.trace("{}: Scheduling keepalives every {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
- currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(),
- keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS);
+ LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+ keepaliveTask.enableKeepalive();
}
@Override
@Override
public void onNotification(final DOMNotification domNotification) {
- resetKeepalive();
+ keepaliveTask.recordActivity();
salFacade.onNotification(domNotification);
}
}
// Keepalive RPC static resources
- private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_NODEID,
- getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
+ private static final @NonNull ContainerNode KEEPALIVE_PAYLOAD =
+ NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_NODEID,
+ getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
/**
* Invoke keepalive RPC and check the response. In case of any received response the keepalive
* response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
* is considered inactive/failed.
*/
- private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
+ private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
+ private volatile long lastActivity;
+ @GuardedBy("this")
+ private boolean suppressed;
+
+ KeepaliveTask() {
+ suppressed = false;
+ }
@Override
public void run() {
- LOG.trace("{}: Invoking keepalive RPC", id);
+ final long local = lastActivity;
+ final long now = System.nanoTime();
+ final long inFutureNanos = local + delayNanos - now;
+ if (inFutureNanos > 0) {
+ reschedule(inFutureNanos);
+ } else {
+ sendKeepalive(now);
+ }
+ }
- try {
- boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
- if (!lastJobSucceeded) {
- onFailure(new IllegalStateException("Previous keepalive timed out"));
- } else {
- Futures.addCallback(currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD), this,
- MoreExecutors.directExecutor());
- }
- } catch (NullPointerException e) {
+ void recordActivity() {
+ lastActivity = System.nanoTime();
+ }
+
+ synchronized void disableKeepalive() {
+ // unsuppressed -> suppressed
+ suppressed = true;
+ }
+
+ synchronized void enableKeepalive() {
+ recordActivity();
+ if (!suppressed) {
+ // unscheduled -> unsuppressed
+ reschedule();
+ } else {
+ // suppressed -> unsuppressed
+ suppressed = false;
+ }
+ }
+
+ private synchronized void sendKeepalive(final long now) {
+ if (suppressed) {
+ // suppressed -> unscheduled
+ suppressed = false;
+ return;
+ }
+
+ final var deviceRpc = currentDeviceRpc;
+ if (deviceRpc == null) {
+ // deviceRpc is null, which means we hit the reconnect window and attempted to send keepalive while
+ // we were reconnecting. Next keepalive will be scheduled after reconnect so no action necessary here.
LOG.debug("{}: Skipping keepalive while reconnecting", id);
- // Empty catch block intentional
- // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and
- // attempted to send keepalive while we were reconnecting. Next keepalive will be scheduled
- // after reconnect so no action necessary here.
+ return;
}
+
+ LOG.trace("{}: Invoking keepalive RPC", id);
+ final var deviceFuture = deviceRpc.invokeRpc(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
+
+ lastActivity = now;
+ Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
}
@SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
public void onSuccess(final DOMRpcResult result) {
// No matter what response we got, rpc-reply or rpc-error,
// we got it from device so the netconf session is OK
- if (result != null && result.getResult() != null) {
- lastKeepAliveSucceeded.set(true);
- } else if (result != null && result.getErrors() != null) {
- LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors());
- lastKeepAliveSucceeded.set(true);
- } else {
+ if (result == null) {
LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
reconnect();
+ return;
+ }
+
+ if (result.getResult() != null) {
+ reschedule();
+ } else {
+ final Collection<?> errors = result.getErrors();
+ if (!errors.isEmpty()) {
+ LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
+ reschedule();
+ } else {
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
+ reconnect();
+ }
}
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
reconnect();
}
- }
- /**
- * Reset keepalive after each RPC response received.
- */
- private class ResetKeepalive implements FutureCallback<DOMRpcResult> {
- @Override
- public void onSuccess(@Nullable final DOMRpcResult result) {
- // No matter what response we got,
- // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
- resetKeepalive();
+ private void reschedule() {
+ reschedule(delayNanos);
}
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- // User/Application RPC failed (The RPC did not reach the remote device or ..
- // TODO what other reasons could cause this ?)
- // There is no point in keeping this session. Reconnect.
- LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
- reconnect();
+ private void reschedule(final long delay) {
+ executor.schedule(this, delay, TimeUnit.NANOSECONDS);
}
}
/*
- * Request timeout task is called once the defaultRequestTimeoutMillis is
- * reached. At this moment, if the request is not yet finished, we cancel
- * it.
+ * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
+ * yet finished, we cancel it.
*/
- private static final class RequestTimeoutTask implements Runnable {
-
- private final FluentFuture<DOMRpcResult> rpcResultFuture;
+ private final class RequestTimeoutTask implements FutureCallback<DOMRpcResult>, Runnable {
+ private final @NonNull SettableFuture<DOMRpcResult> userFuture = SettableFuture.create();
+ private final @NonNull ListenableFuture<? extends DOMRpcResult> deviceFuture;
- RequestTimeoutTask(final FluentFuture<DOMRpcResult> rpcResultFuture) {
- this.rpcResultFuture = rpcResultFuture;
+ RequestTimeoutTask(final ListenableFuture<? extends DOMRpcResult> rpcResultFuture) {
+ deviceFuture = requireNonNull(rpcResultFuture);
+ Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
}
@Override
public void run() {
- if (!rpcResultFuture.isDone()) {
- rpcResultFuture.cancel(true);
- }
+ deviceFuture.cancel(true);
+ userFuture.cancel(false);
+ keepaliveTask.enableKeepalive();
+ }
+
+ @Override
+ public void onSuccess(final DOMRpcResult result) {
+ // No matter what response we got,
+ // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
+ userFuture.set(result);
+ keepaliveTask.enableKeepalive();
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // User/Application RPC failed (The RPC did not reach the remote device or ...)
+ // FIXME: what other reasons could cause this ?)
+ LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
+ userFuture.setException(throwable);
+ // There is no point in keeping this session. Reconnect.
+ reconnect();
}
}
* DOMRpcService proxy that attaches reset-keepalive-task and schedule
* request-timeout-task to each RPC invocation.
*/
- public static final class KeepaliveDOMRpcService implements DOMRpcService {
-
- private final DOMRpcService deviceRpc;
- private final ResetKeepalive resetKeepaliveTask;
- private final long defaultRequestTimeoutMillis;
- private final ScheduledExecutorService executor;
-
- KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask,
- final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor) {
- this.deviceRpc = deviceRpc;
- this.resetKeepaliveTask = resetKeepaliveTask;
- this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
- this.executor = executor;
+ public final class KeepaliveDOMRpcService implements DOMRpcService {
+ private final @NonNull DOMRpcService deviceRpc;
+
+ KeepaliveDOMRpcService(final DOMRpcService deviceRpc) {
+ this.deviceRpc = requireNonNull(deviceRpc);
}
- public DOMRpcService getDeviceRpc() {
+ public @NonNull DOMRpcService getDeviceRpc() {
return deviceRpc;
}
- @Nonnull
@Override
- public @NonNull FluentFuture<DOMRpcResult> invokeRpc(@Nonnull final SchemaPath type,
- final NormalizedNode<?, ?> input) {
- final FluentFuture<DOMRpcResult> rpcResultFuture = deviceRpc.invokeRpc(type, input);
- Futures.addCallback(rpcResultFuture, resetKeepaliveTask, MoreExecutors.directExecutor());
+ public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final NormalizedNode input) {
+ keepaliveTask.disableKeepalive();
+ final ListenableFuture<? extends DOMRpcResult> deviceFuture = deviceRpc.invokeRpc(type, input);
- final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture);
- executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
+ final RequestTimeoutTask timeout = new RequestTimeoutTask(deviceFuture);
+ final ScheduledFuture<?> timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
+ deviceFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
- return rpcResultFuture;
+ return timeout.userFuture;
}
@Override
- public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
- @Nonnull final T listener) {
- // There is no real communication with the device (yet), no reset here
- return deviceRpc.registerRpcListener(listener);
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T rpcListener) {
+ // There is no real communication with the device (yet), hence recordActivity() or anything
+ return deviceRpc.registerRpcListener(rpcListener);
}
}
}