X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Fsal%2FKeepaliveSalFacade.java;h=3b255ed009125becf3eb1abd571df92f2c0e3690;hb=32198feec954f869a760c69bf5a4ccf6f116c7bd;hp=2e33b57beb86f484d147b43d3982c74a7d397cd4;hpb=50351b4cb37a1998cfaef53ff584aeb767c25f50;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java index 2e33b57beb..3b255ed009 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java @@ -8,24 +8,28 @@ package org.opendaylight.netconf.sal.connect.netconf.sal; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.dom.api.DOMNotification; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; @@ -64,6 +68,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler currentKeepalive; private volatile DOMRpcService currentDeviceRpc; + private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false); public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final ScheduledExecutorService executor, final long keepaliveDelaySeconds, @@ -97,12 +102,12 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler * Then schedule next keepalive. */ - private void resetKeepalive() { + void resetKeepalive() { LOG.trace("{}: Resetting netconf keepalive timer", id); if (currentKeepalive != null) { currentKeepalive.cancel(false); } - scheduleKeepalive(); + scheduleKeepalives(); } /** @@ -115,7 +120,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { - private final ScheduledFuture previousKeepalive; - - Keepalive(final ScheduledFuture previousKeepalive) { - this.previousKeepalive = previousKeepalive; - } - @Override public void run() { LOG.trace("{}: Invoking keepalive RPC", id); try { - if (previousKeepalive != null && !previousKeepalive.isDone()) { + final boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false); + if (!lastJobSucceeded) { onFailure(new IllegalStateException("Previous keepalive timed out")); } else { - Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this); + Futures.addCallback(currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD), this, + MoreExecutors.directExecutor()); } - } catch (NullPointerException e) { + } catch (final NullPointerException e) { LOG.debug("{}: Skipping keepalive while reconnecting", id); // Empty catch block intentional // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and @@ -202,13 +213,19 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler schedule; + + public void initScheduler(final Runnable runnable) { + if (currentKeepalive != null) { + currentKeepalive.cancel(true); + } else { + LOG.trace("Keepalive does not exist."); + } + scheduleKeepalives(); + //Listening on the result should be done before the keepalive rpc will be send + final long delay = (keepaliveDelaySeconds * 1000) - 500; + schedule = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + + public void stopScheduler() { + if (schedule != null) { + schedule.cancel(true); + } else { + LOG.trace("Scheduler does not exist."); + } + } + } + + private static final class ResponseWaiting implements Runnable { + + private final FluentFuture rpcResultFuture; + private final ResponseWaitingScheduler responseWaitingScheduler; + + ResponseWaiting(final ResponseWaitingScheduler responseWaitingScheduler, + final FluentFuture rpcResultFuture) { + this.responseWaitingScheduler = responseWaitingScheduler; + this.rpcResultFuture = rpcResultFuture; + } + + public void start() { + LOG.trace("Start to waiting for result."); + responseWaitingScheduler.initScheduler(this); + } + + public void stop() { + LOG.info("Stop to waiting for result."); + responseWaitingScheduler.stopScheduler(); + } + + @Override + public void run() { + if (!rpcResultFuture.isCancelled() && !rpcResultFuture.isDone()) { + LOG.trace("Waiting for result"); + responseWaitingScheduler.initScheduler(this); + } else { + LOG.trace("Result has been cancelled or done."); + } + } + } + /* * Request timeout task is called once the defaultRequestTimeoutMillis is * reached. At this moment, if the request is not yet finished, we cancel @@ -248,10 +322,12 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler rpcResultFuture; + private final FluentFuture rpcResultFuture; + private final ResponseWaiting responseWaiting; - RequestTimeoutTask(final CheckedFuture rpcResultFuture) { + RequestTimeoutTask(final FluentFuture rpcResultFuture, final ResponseWaiting responseWaiting) { this.rpcResultFuture = rpcResultFuture; + this.responseWaiting = responseWaiting; } @Override @@ -259,6 +335,9 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler invokeRpc(@Nonnull final SchemaPath type, + public @NonNull FluentFuture invokeRpc(@Nonnull final SchemaPath type, final NormalizedNode input) { - final CheckedFuture domRpcResultDOMRpcExceptionCheckedFuture = - deviceRpc.invokeRpc(type, input); - Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask); + final FluentFuture rpcResultFuture = deviceRpc.invokeRpc(type, input); + final ResponseWaiting responseWaiting = new ResponseWaiting(responseWaitingScheduler, rpcResultFuture); + responseWaiting.start(); + rpcResultFuture.addCallback(resetKeepaliveTask, MoreExecutors.directExecutor()); - final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(domRpcResultDOMRpcExceptionCheckedFuture); + final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture, responseWaiting); executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS); - return domRpcResultDOMRpcExceptionCheckedFuture; + return rpcResultFuture; } @Override