import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import javax.xml.transform.dom.DOMSource;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMActionService;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceSchema;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present).
* The keepalive RPC is a get-config with empty filter.
*/
-public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSessionPreferences> {
+public final class KeepaliveSalFacade implements RemoteDeviceHandler {
private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class);
// 2 minutes keepalive delay by default
// 1 minute transaction timeout by default
private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
- private final KeepaliveTask keepaliveTask = new KeepaliveTask();
- private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
+ private final RemoteDeviceHandler salFacade;
private final ScheduledExecutorService executor;
private final long keepaliveDelaySeconds;
private final RemoteDeviceId id;
private volatile NetconfDeviceCommunicator listener;
- private volatile DOMRpcService currentDeviceRpc;
+ private volatile KeepaliveTask task;
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
- final long requestTimeoutMillis) {
+ public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
+ final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
+ final long requestTimeoutMillis) {
this.id = id;
this.salFacade = salFacade;
this.executor = requireNonNull(executor);
timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
}
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor) {
+ public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
+ final ScheduledExecutorService executor) {
this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
}
}
/**
- * Cancel current keepalive and also reset current deviceRpc.
+ * Cancel current keepalive and free it.
*/
private synchronized void stopKeepalives() {
- keepaliveTask.disableKeepalive();
- currentDeviceRpc = null;
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.disableKeepalive();
+ task = null;
+ }
+ }
+
+ private void disableKeepalive() {
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.disableKeepalive();
+ }
+ }
+
+ private void enableKeepalive() {
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.enableKeepalive();
+ }
}
void reconnect() {
}
@Override
- public void onDeviceConnected(final MountPointContext remoteSchemaContext,
- final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
- onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc, null);
- }
+ public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
+ final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
+ final var devRpc = services.rpcs();
+ task = new KeepaliveTask(devRpc);
+
+ final Rpcs keepaliveRpcs;
+ if (devRpc instanceof Rpcs.Normalized normalized) {
+ keepaliveRpcs = new NormalizedKeepaliveRpcs(normalized);
+ } else if (devRpc instanceof Rpcs.Schemaless schemaless) {
+ keepaliveRpcs = new SchemalessKeepaliveRpcs(schemaless);
+ } else {
+ throw new IllegalStateException("Unhandled " + devRpc);
+ }
- @Override
- public void onDeviceConnected(final MountPointContext remoteSchemaContext,
- final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc,
- final DOMActionService deviceAction) {
- this.currentDeviceRpc = requireNonNull(deviceRpc);
- salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences,
- new KeepaliveDOMRpcService(deviceRpc), deviceAction);
-
- LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
- keepaliveTask.enableKeepalive();
+ salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
+ // FIXME: wrap with keepalive
+ services.actions()));
+
+ // We have performed a callback, which might have termined keepalives
+ final var localTask = task;
+ if (localTask != null) {
+ LOG.debug("{}: Netconf session initiated, starting keepalives", id);
+ LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+ localTask.enableKeepalive();
+ }
}
@Override
@Override
public void onNotification(final DOMNotification domNotification) {
- keepaliveTask.recordActivity();
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.recordActivity();
+ }
salFacade.onNotification(domNotification);
}
salFacade.close();
}
- // Keepalive RPC static resources
- private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_NODEID,
- getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
+ private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
+ final var timeout = new RequestTimeoutTask<>(invokeFuture);
+ final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
+ invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+ return timeout.userFuture;
+ }
/**
* Invoke keepalive RPC and check the response. In case of any received response the keepalive
* is considered inactive/failed.
*/
private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
- private volatile long lastActivity;
+ // Keepalive RPC static resources
+ static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
+ NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
+
+ private final Rpcs devRpc;
+
@GuardedBy("this")
- private boolean suppressed;
+ private boolean suppressed = false;
- KeepaliveTask() {
- suppressed = false;
+ private volatile long lastActivity;
+
+ KeepaliveTask(final Rpcs devRpc) {
+ this.devRpc = requireNonNull(devRpc);
}
@Override
private synchronized void sendKeepalive(final long now) {
if (suppressed) {
+ LOG.debug("{}: Skipping keepalive while disabled", id);
// suppressed -> unscheduled
suppressed = false;
return;
}
- final DOMRpcService deviceRpc = currentDeviceRpc;
- if (deviceRpc == null) {
- // deviceRpc is null, which means we hit the reconnect window and attempted to send keepalive while
- // we were reconnecting. Next keepalive will be scheduled after reconnect so no action necessary here.
- LOG.debug("{}: Skipping keepalive while reconnecting", id);
- return;
- }
-
LOG.trace("{}: Invoking keepalive RPC", id);
- final ListenableFuture<? extends DOMRpcResult> deviceFuture =
- currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
-
+ final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
lastActivity = now;
Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
}
+ // FIXME: re-examine this suppression
@SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
justification = "Unrecognised NullableDecl")
@Override
return;
}
- if (result.getResult() != null) {
+ if (result.value() != null) {
reschedule();
} else {
- final Collection<?> errors = result.getErrors();
+ final var errors = result.errors();
if (!errors.isEmpty()) {
LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
reschedule();
* Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
* yet finished, we cancel it.
*/
- private final class RequestTimeoutTask implements FutureCallback<DOMRpcResult>, Runnable {
- private final @NonNull SettableFuture<DOMRpcResult> userFuture = SettableFuture.create();
- private final @NonNull ListenableFuture<? extends DOMRpcResult> deviceFuture;
+ private final class RequestTimeoutTask<V> implements FutureCallback<V>, Runnable {
+ private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
+ private final @NonNull ListenableFuture<? extends V> rpcResultFuture;
- RequestTimeoutTask(final ListenableFuture<? extends DOMRpcResult> rpcResultFuture) {
- this.deviceFuture = requireNonNull(rpcResultFuture);
- Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
+ RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
+ this.rpcResultFuture = requireNonNull(rpcResultFuture);
+ Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
}
@Override
public void run() {
- deviceFuture.cancel(true);
+ rpcResultFuture.cancel(true);
userFuture.cancel(false);
- keepaliveTask.enableKeepalive();
+ enableKeepalive();
}
@Override
- public void onSuccess(final DOMRpcResult result) {
+ public void onSuccess(final V result) {
// No matter what response we got,
// rpc-reply or rpc-error, we got it from device so the netconf session is OK.
userFuture.set(result);
- keepaliveTask.enableKeepalive();
+ enableKeepalive();
}
@Override
}
/**
- * DOMRpcService proxy that attaches reset-keepalive-task and schedule
- * request-timeout-task to each RPC invocation.
+ * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
+ * invocation. Version for {@link Rpcs.Normalized}.
*/
- public final class KeepaliveDOMRpcService implements DOMRpcService {
- private final @NonNull DOMRpcService deviceRpc;
+ private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized {
+ private final Rpcs.Normalized delegate;
- KeepaliveDOMRpcService(final DOMRpcService deviceRpc) {
- this.deviceRpc = requireNonNull(deviceRpc);
+ NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) {
+ this.delegate = requireNonNull(delegate);
}
- public @NonNull DOMRpcService getDeviceRpc() {
- return deviceRpc;
+ @Override
+ public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
+ // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
+ disableKeepalive();
+ return scheduleTimeout(delegate.invokeRpc(type, input));
}
@Override
- public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final NormalizedNode<?, ?> input) {
- keepaliveTask.disableKeepalive();
- final ListenableFuture<? extends DOMRpcResult> deviceFuture = deviceRpc.invokeRpc(type, input);
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+ final T rpcListener) {
+ // There is no real communication with the device (yet), hence no recordActivity() or anything
+ return delegate.registerRpcListener(rpcListener);
+ }
+ }
+
+ /**
+ * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
+ * invocation. Version for {@link Rpcs.Schemaless}.
+ */
+ private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless {
+ private final Rpcs.Schemaless delegate;
- final RequestTimeoutTask timeout = new RequestTimeoutTask(deviceFuture);
- final ScheduledFuture<?> timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
- deviceFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+ SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) {
+ this.delegate = requireNonNull(delegate);
+ }
- return timeout.userFuture;
+ @Override
+ public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
+ // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
+ disableKeepalive();
+ return scheduleTimeout(delegate.invokeNetconf(type, input));
}
@Override
- public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T rpcListener) {
- // There is no real communication with the device (yet), hence recordActivity() or anything
- return deviceRpc.registerRpcListener(rpcListener);
+ public ListenableFuture<? extends DOMSource> invokeRpc(final QName type, final DOMSource input) {
+ // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
+ disableKeepalive();
+ return scheduleTimeout(delegate.invokeRpc(type, input));
}
}
}