*/
package org.opendaylight.netconf.sal.connect.netconf.sal;
+import static com.google.common.base.Preconditions.checkState;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
+import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH;
import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
-import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
// 2 minutes keepalive delay by default
private static final long DEFAULT_DELAY = TimeUnit.MINUTES.toSeconds(2);
+ // 1 minute transaction timeout by default
+ private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
+
private final RemoteDeviceId id;
private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final ScheduledExecutorService executor;
private final long keepaliveDelaySeconds;
private final ResetKeepalive resetKeepaliveTask;
+ private final long defaultRequestTimeoutMillis;
private volatile NetconfDeviceCommunicator listener;
private volatile ScheduledFuture<?> currentKeepalive;
private volatile DOMRpcService currentDeviceRpc;
+ private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false);
public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ScheduledExecutorService executor, final long keepaliveDelaySeconds) {
+ final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
+ final long defaultRequestTimeoutMillis) {
this.id = id;
this.salFacade = salFacade;
this.executor = executor;
this.keepaliveDelaySeconds = keepaliveDelaySeconds;
+ this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
this.resetKeepaliveTask = new ResetKeepalive();
}
public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
final ScheduledExecutorService executor) {
- this(id, salFacade, executor, DEFAULT_DELAY);
+ this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
}
/**
- * Set the netconf session listener whenever ready
+ * Set the netconf session listener whenever ready.
*
* @param listener netconf session listener
*/
* Just cancel current keepalive task.
* If its already started, let it finish ... not such a big deal.
*
+ * <p>
* Then schedule next keepalive.
*/
- private void resetKeepalive() {
+ void resetKeepalive() {
LOG.trace("{}: Resetting netconf keepalive timer", id);
- if(currentKeepalive != null) {
+ if (currentKeepalive != null) {
currentKeepalive.cancel(false);
}
- scheduleKeepalive();
+ scheduleKeepalives();
}
/**
- * Cancel current keepalive and also reset current deviceRpc
+ * Cancel current keepalive and also reset current deviceRpc.
*/
private void stopKeepalives() {
- if(currentKeepalive != null) {
+ if (currentKeepalive != null) {
currentKeepalive.cancel(false);
}
currentDeviceRpc = null;
}
- private void reconnect() {
- Preconditions.checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
+ void reconnect() {
+ checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
stopKeepalives();
LOG.info("{}: Reconnecting inactive netconf session", id);
listener.disconnect();
}
@Override
- public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
+ public void onDeviceConnected(final SchemaContext remoteSchemaContext,
+ final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
+ onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc, null);
+ }
+
+ @Override
+ public void onDeviceConnected(final SchemaContext remoteSchemaContext,
+ final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc,
+ final DOMActionService deviceAction) {
this.currentDeviceRpc = deviceRpc;
- final DOMRpcService deviceRpc1 = new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask);
- salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1);
+ final DOMRpcService deviceRpc1 =
+ new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor,
+ new ResponseWaitingScheduler());
+
+ salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1, deviceAction);
LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- scheduleKeepalive();
+ scheduleKeepalives();
}
- private void scheduleKeepalive() {
- Preconditions.checkState(currentDeviceRpc != null);
- LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
- currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS);
+ private void scheduleKeepalives() {
+ lastKeepAliveSucceeded.set(true);
+ checkState(currentDeviceRpc != null);
+ LOG.trace("{}: Scheduling keepalives every {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
+ currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(),
+ keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS);
}
@Override
}
// Keepalive RPC static resources
- private static final SchemaPath PATH = toPath(NETCONF_GET_CONFIG_QNAME);
- private static final ContainerNode KEEPALIVE_PAYLOAD =
- NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
+ private static final ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_NODEID,
+ getSourceNode(NETCONF_RUNNING_QNAME), NetconfMessageTransformUtil.EMPTY_FILTER);
/**
* Invoke keepalive RPC and check the response. In case of any received response the keepalive
*/
private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
- private final ScheduledFuture<?> previousKeepalive;
-
- public Keepalive(final ScheduledFuture<?> previousKeepalive) {
- this.previousKeepalive = previousKeepalive;
- }
-
@Override
public void run() {
LOG.trace("{}: Invoking keepalive RPC", id);
try {
- if(previousKeepalive != null && !previousKeepalive.isDone()) {
+ final boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
+ if (!lastJobSucceeded) {
onFailure(new IllegalStateException("Previous keepalive timed out"));
} else {
- Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this);
+ currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD).addCallback(this,
+ MoreExecutors.directExecutor());
}
- } catch (NullPointerException e) {
+ } catch (final NullPointerException e) {
LOG.debug("{}: Skipping keepalive while reconnecting", id);
// Empty catch block intentional
// Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and
}
}
+ @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
+ justification = "Unrecognised NullableDecl")
@Override
public void onSuccess(final DOMRpcResult result) {
- if (result != null && result.getResult() != null) {
- LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult());
- scheduleKeepalive();
+ // No matter what response we got, rpc-reply or rpc-error,
+ // we got it from device so the netconf session is OK
+ if (result == null) {
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
+ reconnect();
+ return;
+ }
+
+ if (result.getResult() != null) {
+ lastKeepAliveSucceeded.set(true);
+ } else if (result.getErrors() != null) {
+ LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors());
+ lastKeepAliveSucceeded.set(true);
} else {
- LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session", id, result);
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
reconnect();
}
}
@Override
- public void onFailure(@Nonnull final Throwable t) {
- LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, t);
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
reconnect();
}
}
/**
- * Reset keepalive after each RPC response received
+ * Reset keepalive after each RPC response received.
*/
- private class ResetKeepalive implements com.google.common.util.concurrent.FutureCallback<DOMRpcResult> {
+ private class ResetKeepalive implements FutureCallback<DOMRpcResult> {
@Override
- public void onSuccess(@Nullable final DOMRpcResult result) {
- // No matter what response we got, rpc-reply or rpc-error, we got it from device so the netconf session is OK
+ public void onSuccess(final DOMRpcResult result) {
+ // No matter what response we got,
+ // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
resetKeepalive();
}
@Override
- public void onFailure(@Nonnull final Throwable t) {
- // User/Application RPC failed (The RPC did not reach the remote device.
+ public void onFailure(final Throwable throwable) {
+ // User/Application RPC failed (The RPC did not reach the remote device or ..
+ // TODO what other reasons could cause this ?)
// There is no point in keeping this session. Reconnect.
- LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, t);
+ LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
reconnect();
}
}
+ private final class ResponseWaitingScheduler {
+
+ private ScheduledFuture<?> schedule;
+
+ public void initScheduler(final Runnable runnable) {
+ if (currentKeepalive != null) {
+ currentKeepalive.cancel(true);
+ } else {
+ LOG.trace("Keepalive does not exist.");
+ }
+ scheduleKeepalives();
+ //Listening on the result should be done before the keepalive rpc will be send
+ final long delay = (keepaliveDelaySeconds * 1000) - 500;
+ schedule = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopScheduler() {
+ if (schedule != null) {
+ schedule.cancel(true);
+ } else {
+ LOG.trace("Scheduler does not exist.");
+ }
+ }
+ }
+
+ private static final class ResponseWaiting implements Runnable {
+
+ private final FluentFuture<DOMRpcResult> rpcResultFuture;
+ private final ResponseWaitingScheduler responseWaitingScheduler;
+
+ ResponseWaiting(final ResponseWaitingScheduler responseWaitingScheduler,
+ final FluentFuture<DOMRpcResult> rpcResultFuture) {
+ this.responseWaitingScheduler = responseWaitingScheduler;
+ this.rpcResultFuture = rpcResultFuture;
+ }
+
+ public void start() {
+ LOG.trace("Start to waiting for result.");
+ responseWaitingScheduler.initScheduler(this);
+ }
+
+ public void stop() {
+ LOG.info("Stop to waiting for result.");
+ responseWaitingScheduler.stopScheduler();
+ }
+
+ @Override
+ public void run() {
+ if (!rpcResultFuture.isCancelled() && !rpcResultFuture.isDone()) {
+ LOG.trace("Waiting for result");
+ responseWaitingScheduler.initScheduler(this);
+ } else {
+ LOG.trace("Result has been cancelled or done.");
+ }
+ }
+ }
+
+ /*
+ * Request timeout task is called once the defaultRequestTimeoutMillis is
+ * reached. At this moment, if the request is not yet finished, we cancel
+ * it.
+ */
+ private static final class RequestTimeoutTask implements Runnable {
+
+ private final FluentFuture<DOMRpcResult> rpcResultFuture;
+ private final ResponseWaiting responseWaiting;
+
+ RequestTimeoutTask(final FluentFuture<DOMRpcResult> rpcResultFuture, final ResponseWaiting responseWaiting) {
+ this.rpcResultFuture = rpcResultFuture;
+ this.responseWaiting = responseWaiting;
+ }
+
+ @Override
+ public void run() {
+ if (!rpcResultFuture.isDone()) {
+ rpcResultFuture.cancel(true);
+ }
+ if (responseWaiting != null) {
+ responseWaiting.stop();
+ }
+ }
+ }
+
/**
- * DOMRpcService proxy that attaches reset-keepalive-task to each RPC invocation.
+ * DOMRpcService proxy that attaches reset-keepalive-task and schedule
+ * request-timeout-task to each RPC invocation.
*/
- private static final class KeepaliveDOMRpcService implements DOMRpcService {
+ public static final class KeepaliveDOMRpcService implements DOMRpcService {
private final DOMRpcService deviceRpc;
- private ResetKeepalive resetKeepaliveTask;
-
- public KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask) {
+ private final ResetKeepalive resetKeepaliveTask;
+ private final long defaultRequestTimeoutMillis;
+ private final ScheduledExecutorService executor;
+ private final ResponseWaitingScheduler responseWaitingScheduler;
+
+ KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask,
+ final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor,
+ final ResponseWaitingScheduler responseWaitingScheduler) {
this.deviceRpc = deviceRpc;
this.resetKeepaliveTask = resetKeepaliveTask;
+ this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
+ this.executor = executor;
+ this.responseWaitingScheduler = responseWaitingScheduler;
+ }
+
+ public DOMRpcService getDeviceRpc() {
+ return deviceRpc;
}
- @Nonnull
@Override
- public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type, final NormalizedNode<?, ?> input) {
- final CheckedFuture<DOMRpcResult, DOMRpcException> domRpcResultDOMRpcExceptionCheckedFuture = deviceRpc.invokeRpc(type, input);
- Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask);
- return domRpcResultDOMRpcExceptionCheckedFuture;
+ public FluentFuture<DOMRpcResult> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
+ final FluentFuture<DOMRpcResult> rpcResultFuture = deviceRpc.invokeRpc(type, input);
+ final ResponseWaiting responseWaiting = new ResponseWaiting(responseWaitingScheduler, rpcResultFuture);
+ responseWaiting.start();
+ rpcResultFuture.addCallback(resetKeepaliveTask, MoreExecutors.directExecutor());
+
+ final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture, responseWaiting);
+ executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
+
+ return rpcResultFuture;
}
@Override
- public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(@Nonnull final T listener) {
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
// There is no real communication with the device (yet), no reset here
return deviceRpc.registerRpcListener(listener);
}