X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Fsal%2FKeepaliveSalFacade.java;h=3b255ed009125becf3eb1abd571df92f2c0e3690;hb=32198feec954f869a760c69bf5a4ccf6f116c7bd;hp=c450ac68fecd83c40d8eff6c306e30d02e6b35c1;hpb=c0488b3e5a8d1bbe2894eda5eb1254f6ced347a4;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java index c450ac68fe..3b255ed009 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java @@ -8,24 +8,28 @@ package org.opendaylight.netconf.sal.connect.netconf.sal; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.dom.api.DOMNotification; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; @@ -64,9 +68,11 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler currentKeepalive; private volatile DOMRpcService currentDeviceRpc; + private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false); public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor, final long keepaliveDelaySeconds, final long defaultRequestTimeoutMillis) { + final ScheduledExecutorService executor, final long keepaliveDelaySeconds, + final long defaultRequestTimeoutMillis) { this.id = id; this.salFacade = salFacade; this.executor = executor; @@ -81,7 +87,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler * Then schedule next keepalive. */ - private void resetKeepalive() { + void resetKeepalive() { LOG.trace("{}: Resetting netconf keepalive timer", id); - if(currentKeepalive != null) { + if (currentKeepalive != null) { currentKeepalive.cancel(false); } - scheduleKeepalive(); + scheduleKeepalives(); } /** - * Cancel current keepalive and also reset current deviceRpc + * Cancel current keepalive and also reset current deviceRpc. */ private void stopKeepalives() { - if(currentKeepalive != null) { + if (currentKeepalive != null) { currentKeepalive.cancel(false); } currentDeviceRpc = null; } - private void reconnect() { + void reconnect() { Preconditions.checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id); stopKeepalives(); LOG.info("{}: Reconnecting inactive netconf session", id); @@ -121,19 +128,32 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { - private final ScheduledFuture previousKeepalive; - - public Keepalive(final ScheduledFuture previousKeepalive) { - this.previousKeepalive = previousKeepalive; - } - @Override public void run() { LOG.trace("{}: Invoking keepalive RPC", id); try { - if(previousKeepalive != null && !previousKeepalive.isDone()) { + final boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false); + if (!lastJobSucceeded) { onFailure(new IllegalStateException("Previous keepalive timed out")); } else { - Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this); + Futures.addCallback(currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD), this, + MoreExecutors.directExecutor()); } - } catch (NullPointerException e) { + } catch (final NullPointerException e) { LOG.debug("{}: Skipping keepalive while reconnecting", id); // Empty catch block intentional // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and @@ -198,43 +213,108 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { @Override public void onSuccess(@Nullable final DOMRpcResult result) { - // No matter what response we got, rpc-reply or rpc-error, we got it from device so the netconf session is OK + // No matter what response we got, + // rpc-reply or rpc-error, we got it from device so the netconf session is OK. resetKeepalive(); } @Override - public void onFailure(@Nonnull final Throwable t) { - // User/Application RPC failed (The RPC did not reach the remote device or .. TODO what other reasons could cause this ?) + public void onFailure(@Nonnull final Throwable throwable) { + // User/Application RPC failed (The RPC did not reach the remote device or .. + // TODO what other reasons could cause this ?) // There is no point in keeping this session. Reconnect. - LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, t); + LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable); reconnect(); } } + private final class ResponseWaitingScheduler { + + private ScheduledFuture schedule; + + public void initScheduler(final Runnable runnable) { + if (currentKeepalive != null) { + currentKeepalive.cancel(true); + } else { + LOG.trace("Keepalive does not exist."); + } + scheduleKeepalives(); + //Listening on the result should be done before the keepalive rpc will be send + final long delay = (keepaliveDelaySeconds * 1000) - 500; + schedule = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + + public void stopScheduler() { + if (schedule != null) { + schedule.cancel(true); + } else { + LOG.trace("Scheduler does not exist."); + } + } + } + + private static final class ResponseWaiting implements Runnable { + + private final FluentFuture rpcResultFuture; + private final ResponseWaitingScheduler responseWaitingScheduler; + + ResponseWaiting(final ResponseWaitingScheduler responseWaitingScheduler, + final FluentFuture rpcResultFuture) { + this.responseWaitingScheduler = responseWaitingScheduler; + this.rpcResultFuture = rpcResultFuture; + } + + public void start() { + LOG.trace("Start to waiting for result."); + responseWaitingScheduler.initScheduler(this); + } + + public void stop() { + LOG.info("Stop to waiting for result."); + responseWaitingScheduler.stopScheduler(); + } + + @Override + public void run() { + if (!rpcResultFuture.isCancelled() && !rpcResultFuture.isDone()) { + LOG.trace("Waiting for result"); + responseWaitingScheduler.initScheduler(this); + } else { + LOG.trace("Result has been cancelled or done."); + } + } + } + /* * Request timeout task is called once the defaultRequestTimeoutMillis is * reached. At this moment, if the request is not yet finished, we cancel @@ -242,10 +322,12 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler rpcResultFuture; + private final FluentFuture rpcResultFuture; + private final ResponseWaiting responseWaiting; - public RequestTimeoutTask(final CheckedFuture rpcResultFuture) { + RequestTimeoutTask(final FluentFuture rpcResultFuture, final ResponseWaiting responseWaiting) { this.rpcResultFuture = rpcResultFuture; + this.responseWaiting = responseWaiting; } @Override @@ -253,6 +335,9 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler invokeRpc(@Nonnull final SchemaPath type, final NormalizedNode input) { - final CheckedFuture domRpcResultDOMRpcExceptionCheckedFuture = deviceRpc.invokeRpc(type, input); - Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask); - - final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture); + public @NonNull FluentFuture invokeRpc(@Nonnull final SchemaPath type, + final NormalizedNode input) { + final FluentFuture rpcResultFuture = deviceRpc.invokeRpc(type, input); + final ResponseWaiting responseWaiting = new ResponseWaiting(responseWaitingScheduler, rpcResultFuture); + responseWaiting.start(); + rpcResultFuture.addCallback(resetKeepaliveTask, MoreExecutors.directExecutor()); + + final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture, responseWaiting); executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS); - return domRpcResultDOMRpcExceptionCheckedFuture; + return rpcResultFuture; } @Override - public ListenerRegistration registerRpcListener(@Nonnull final T listener) { + public ListenerRegistration registerRpcListener( + @Nonnull final T listener) { // There is no real communication with the device (yet), no reset here return deviceRpc.registerRpcListener(listener); }