X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Fsal%2FKeepaliveSalFacade.java;h=825cd380996b90a61ad7cb9e01e9b87b6b78ff29;hb=a201b000f7d777bd7b53748c3f13487fbb398599;hp=dc2b5dff427f3b40c12ebd5624b1a33151cabff3;hpb=63abf8472cfb70f4e2729585dc9ff95b41d068f8;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java index dc2b5dff42..825cd38099 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java @@ -7,25 +7,25 @@ */ package org.opendaylight.netconf.sal.connect.netconf.sal; +import static com.google.common.base.Preconditions.checkState; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps.getSourceNode; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID; +import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_PATH; import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME; -import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.toPath; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.dom.api.DOMNotification; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import java.util.concurrent.atomic.AtomicBoolean; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; @@ -51,32 +51,39 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler salFacade; private final ScheduledExecutorService executor; private final long keepaliveDelaySeconds; private final ResetKeepalive resetKeepaliveTask; + private final long defaultRequestTimeoutMillis; private volatile NetconfDeviceCommunicator listener; private volatile ScheduledFuture currentKeepalive; private volatile DOMRpcService currentDeviceRpc; + private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false); public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor, final long keepaliveDelaySeconds) { + final ScheduledExecutorService executor, final long keepaliveDelaySeconds, + final long defaultRequestTimeoutMillis) { this.id = id; this.salFacade = salFacade; this.executor = executor; this.keepaliveDelaySeconds = keepaliveDelaySeconds; + this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis; this.resetKeepaliveTask = new ResetKeepalive(); } public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final ScheduledExecutorService executor) { - this(id, salFacade, executor, DEFAULT_DELAY); + this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI); } /** - * Set the netconf session listener whenever ready + * Set the netconf session listener whenever ready. * * @param listener netconf session listener */ @@ -88,47 +95,61 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler * Then schedule next keepalive. */ - private void resetKeepalive() { + void resetKeepalive() { LOG.trace("{}: Resetting netconf keepalive timer", id); - if(currentKeepalive != null) { + if (currentKeepalive != null) { currentKeepalive.cancel(false); } - scheduleKeepalive(); + scheduleKeepalives(); } /** - * Cancel current keepalive and also reset current deviceRpc + * Cancel current keepalive and also reset current deviceRpc. */ private void stopKeepalives() { - if(currentKeepalive != null) { + if (currentKeepalive != null) { currentKeepalive.cancel(false); } currentDeviceRpc = null; } - private void reconnect() { - Preconditions.checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id); + void reconnect() { + checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id); stopKeepalives(); LOG.info("{}: Reconnecting inactive netconf session", id); listener.disconnect(); } @Override - public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) { + public void onDeviceConnected(final SchemaContext remoteSchemaContext, + final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) { + onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc, null); + } + + @Override + public void onDeviceConnected(final SchemaContext remoteSchemaContext, + final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc, + final DOMActionService deviceAction) { this.currentDeviceRpc = deviceRpc; - final DOMRpcService deviceRpc1 = new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask); - salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1); + final DOMRpcService deviceRpc1 = + new KeepaliveDOMRpcService(deviceRpc, resetKeepaliveTask, defaultRequestTimeoutMillis, executor, + new ResponseWaitingScheduler()); + + salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1, deviceAction); LOG.debug("{}: Netconf session initiated, starting keepalives", id); - scheduleKeepalive(); + scheduleKeepalives(); } - private void scheduleKeepalive() { - Preconditions.checkState(currentDeviceRpc != null); - LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS); - currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS); + private void scheduleKeepalives() { + lastKeepAliveSucceeded.set(true); + checkState(currentDeviceRpc != null); + LOG.trace("{}: Scheduling keepalives every {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS); + currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(), + keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS); } @Override @@ -156,9 +177,8 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { - private final ScheduledFuture previousKeepalive; - - public Keepalive(final ScheduledFuture previousKeepalive) { - this.previousKeepalive = previousKeepalive; - } - @Override public void run() { LOG.trace("{}: Invoking keepalive RPC", id); try { - if(previousKeepalive != null && !previousKeepalive.isDone()) { + final boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false); + if (!lastJobSucceeded) { onFailure(new IllegalStateException("Previous keepalive timed out")); } else { - Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this); + currentDeviceRpc.invokeRpc(NETCONF_GET_CONFIG_PATH, KEEPALIVE_PAYLOAD).addCallback(this, + MoreExecutors.directExecutor()); } - } catch (NullPointerException e) { + } catch (final NullPointerException e) { LOG.debug("{}: Skipping keepalive while reconnecting", id); // Empty catch block intentional // Do nothing. The currentDeviceRpc was null and it means we hit the reconnect window and @@ -193,66 +209,181 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { + private class ResetKeepalive implements FutureCallback { @Override - public void onSuccess(@Nullable final DOMRpcResult result) { - // No matter what response we got, rpc-reply or rpc-error, we got it from device so the netconf session is OK + public void onSuccess(final DOMRpcResult result) { + // No matter what response we got, + // rpc-reply or rpc-error, we got it from device so the netconf session is OK. resetKeepalive(); } @Override - public void onFailure(@Nonnull final Throwable t) { - // User/Application RPC failed (The RPC did not reach the remote device. + public void onFailure(final Throwable throwable) { + // User/Application RPC failed (The RPC did not reach the remote device or .. + // TODO what other reasons could cause this ?) // There is no point in keeping this session. Reconnect. - LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, t); + LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable); reconnect(); } } + private final class ResponseWaitingScheduler { + + private ScheduledFuture schedule; + + public void initScheduler(final Runnable runnable) { + if (currentKeepalive != null) { + currentKeepalive.cancel(true); + } else { + LOG.trace("Keepalive does not exist."); + } + scheduleKeepalives(); + //Listening on the result should be done before the keepalive rpc will be send + final long delay = (keepaliveDelaySeconds * 1000) - 500; + schedule = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + + public void stopScheduler() { + if (schedule != null) { + schedule.cancel(true); + } else { + LOG.trace("Scheduler does not exist."); + } + } + } + + private static final class ResponseWaiting implements Runnable { + + private final FluentFuture rpcResultFuture; + private final ResponseWaitingScheduler responseWaitingScheduler; + + ResponseWaiting(final ResponseWaitingScheduler responseWaitingScheduler, + final FluentFuture rpcResultFuture) { + this.responseWaitingScheduler = responseWaitingScheduler; + this.rpcResultFuture = rpcResultFuture; + } + + public void start() { + LOG.trace("Start to waiting for result."); + responseWaitingScheduler.initScheduler(this); + } + + public void stop() { + LOG.info("Stop to waiting for result."); + responseWaitingScheduler.stopScheduler(); + } + + @Override + public void run() { + if (!rpcResultFuture.isCancelled() && !rpcResultFuture.isDone()) { + LOG.trace("Waiting for result"); + responseWaitingScheduler.initScheduler(this); + } else { + LOG.trace("Result has been cancelled or done."); + } + } + } + + /* + * Request timeout task is called once the defaultRequestTimeoutMillis is + * reached. At this moment, if the request is not yet finished, we cancel + * it. + */ + private static final class RequestTimeoutTask implements Runnable { + + private final FluentFuture rpcResultFuture; + private final ResponseWaiting responseWaiting; + + RequestTimeoutTask(final FluentFuture rpcResultFuture, final ResponseWaiting responseWaiting) { + this.rpcResultFuture = rpcResultFuture; + this.responseWaiting = responseWaiting; + } + + @Override + public void run() { + if (!rpcResultFuture.isDone()) { + rpcResultFuture.cancel(true); + } + if (responseWaiting != null) { + responseWaiting.stop(); + } + } + } + /** - * DOMRpcService proxy that attaches reset-keepalive-task to each RPC invocation. + * DOMRpcService proxy that attaches reset-keepalive-task and schedule + * request-timeout-task to each RPC invocation. */ - private static final class KeepaliveDOMRpcService implements DOMRpcService { + public static final class KeepaliveDOMRpcService implements DOMRpcService { private final DOMRpcService deviceRpc; - private ResetKeepalive resetKeepaliveTask; - - public KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask) { + private final ResetKeepalive resetKeepaliveTask; + private final long defaultRequestTimeoutMillis; + private final ScheduledExecutorService executor; + private final ResponseWaitingScheduler responseWaitingScheduler; + + KeepaliveDOMRpcService(final DOMRpcService deviceRpc, final ResetKeepalive resetKeepaliveTask, + final long defaultRequestTimeoutMillis, final ScheduledExecutorService executor, + final ResponseWaitingScheduler responseWaitingScheduler) { this.deviceRpc = deviceRpc; this.resetKeepaliveTask = resetKeepaliveTask; + this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis; + this.executor = executor; + this.responseWaitingScheduler = responseWaitingScheduler; + } + + public DOMRpcService getDeviceRpc() { + return deviceRpc; } - @Nonnull @Override - public CheckedFuture invokeRpc(@Nonnull final SchemaPath type, final NormalizedNode input) { - final CheckedFuture domRpcResultDOMRpcExceptionCheckedFuture = deviceRpc.invokeRpc(type, input); - Futures.addCallback(domRpcResultDOMRpcExceptionCheckedFuture, resetKeepaliveTask); - return domRpcResultDOMRpcExceptionCheckedFuture; + public FluentFuture invokeRpc(final SchemaPath type, final NormalizedNode input) { + final FluentFuture rpcResultFuture = deviceRpc.invokeRpc(type, input); + final ResponseWaiting responseWaiting = new ResponseWaiting(responseWaitingScheduler, rpcResultFuture); + responseWaiting.start(); + rpcResultFuture.addCallback(resetKeepaliveTask, MoreExecutors.directExecutor()); + + final RequestTimeoutTask timeoutTask = new RequestTimeoutTask(rpcResultFuture, responseWaiting); + executor.schedule(timeoutTask, defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS); + + return rpcResultFuture; } @Override - public ListenerRegistration registerRpcListener(@Nonnull final T listener) { + public ListenerRegistration registerRpcListener(final T listener) { // There is no real communication with the device (yet), no reset here return deviceRpc.registerRpcListener(listener); }