package org.opendaylight.netconf.sal.connect.netconf.sal;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
private volatile NetconfDeviceCommunicator listener;
private volatile ScheduledFuture<?> currentKeepalive;
private volatile DOMRpcService currentDeviceRpc;
+ private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false);
public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
if (currentKeepalive != null) {
currentKeepalive.cancel(false);
}
- scheduleKeepalive();
+ scheduleKeepalives();
}
/**
@Override
public void onDeviceConnected(final SchemaContext remoteSchemaContext,
final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
+ onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc, null);
+ }
+
+ @Override
+ public void onDeviceConnected(final SchemaContext remoteSchemaContext,
+ final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc,
+ final DOMActionService deviceAction) {
this.currentDeviceRpc = deviceRpc;
final DOMRpcService deviceRpc1 =
new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor);
- salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1);
+
+ salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1, deviceAction);
LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- scheduleKeepalive();
+ scheduleKeepalives();
}
- private void scheduleKeepalive() {
+ private void scheduleKeepalives() {
+ lastKeepAliveSucceeded.set(true);
Preconditions.checkState(currentDeviceRpc != null);
- LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
- currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS);
+ LOG.trace("{}: Scheduling keepalives every {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
+ currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(),
+ keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS);
}
@Override
}
// Keepalive RPC static resources
- private static final SchemaPath PATH = toPath(NETCONF_GET_CONFIG_QNAME);
- private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME,
+ private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_NODEID,
getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
/**
*/
private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
- private final ScheduledFuture<?> previousKeepalive;
-
- Keepalive(final ScheduledFuture<?> previousKeepalive) {
- this.previousKeepalive = previousKeepalive;
- }
-
@Override
public void run() {
LOG.trace("{}: Invoking keepalive RPC", id);
try {
- if (previousKeepalive != null && !previousKeepalive.isDone()) {
+ boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
+ if (!lastJobSucceeded) {
onFailure(new IllegalStateException("Previous keepalive timed out"));
} else {
- Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this);
+ Futures.addCallback(currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD), this,
+ MoreExecutors.directExecutor());
}
} catch (NullPointerException e) {
LOG.debug("{}: Skipping keepalive while reconnecting", id);
}
}
+ @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
+ justification = "Unrecognised NullableDecl")
@Override
public void onSuccess(final DOMRpcResult result) {
// No matter what response we got, rpc-reply or rpc-error,
// we got it from device so the netconf session is OK
if (result != null && result.getResult() != null) {
- LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult());
- scheduleKeepalive();
- } else if (result != null && result.getErrors() != null) {
+ lastKeepAliveSucceeded.set(true);
+ } else if (result != null && result.getErrors() != null) {
LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors());
- scheduleKeepalive();
+ lastKeepAliveSucceeded.set(true);
} else {
- LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session", id, result);
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
reconnect();
}
}
*/
private static final class RequestTimeoutTask implements Runnable {
- private final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultFuture;
+ private final FluentFuture<DOMRpcResult> rpcResultFuture;
- RequestTimeoutTask(final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultFuture) {
+ RequestTimeoutTask(final FluentFuture<DOMRpcResult> rpcResultFuture) {
this.rpcResultFuture = rpcResultFuture;
}
public static final class KeepaliveDOMRpcService implements DOMRpcService {
private final DOMRpcService deviceRpc;
- private ResetKeepalive resetKeepaliveTask;
+ private final ResetKeepalive resetKeepaliveTask;
private final long defaultRequestTimeoutMillis;
private final ScheduledExecutorService executor;
@Nonnull
@Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type,
+ public @NonNull FluentFuture<DOMRpcResult> invokeRpc(@Nonnull final SchemaPath type,
final NormalizedNode<?, ?> input) {
- final CheckedFuture<DOMRpcResult, DOMRpcException> domRpcResultDOMRpcExceptionCheckedFuture =
+ final FluentFuture<DOMRpcResult> domRpcResultDOMRpcExceptionCheckedFuture =
deviceRpc.invokeRpc(type, input);
- Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask);
+ Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask,
+ MoreExecutors.directExecutor());
final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture);
executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);