X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=plugins%2Fnetconf-client-mdsal%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fclient%2Fmdsal%2Fspi%2FKeepaliveSalFacade.java;h=d9551a7b026bed17628ada2fe858f838bc54f942;hb=fdc5756e29c8f0717fd3c178ca201d07338f98c1;hp=a169dec871b1f75e684145ed543a0392abf2e6d1;hpb=5695e7380bed5c04120671eda9ac4eb0e223be14;p=netconf.git diff --git a/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/spi/KeepaliveSalFacade.java b/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/spi/KeepaliveSalFacade.java index a169dec871..d9551a7b02 100644 --- a/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/spi/KeepaliveSalFacade.java +++ b/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/spi/KeepaliveSalFacade.java @@ -11,7 +11,6 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; import static org.opendaylight.netconf.client.mdsal.impl.NetconfBaseOps.getSourceNode; import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID; -import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME; import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID; import com.google.common.util.concurrent.FutureCallback; @@ -19,7 +18,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import java.util.concurrent.ScheduledExecutorService; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import javax.xml.transform.dom.DOMSource; import org.checkerframework.checker.lock.qual.GuardedBy; @@ -35,6 +36,8 @@ import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId; import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices; import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs; import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil; +import org.opendaylight.netconf.common.NetconfTimer; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.GetConfig; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -55,32 +58,29 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { // 1 minute transaction timeout by default private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000); - private final RemoteDeviceHandler salFacade; - private final ScheduledExecutorService executor; - + private final RemoteDeviceHandler deviceHandler; + private final RemoteDeviceId deviceId; + private final NetconfTimer timer; private final long keepaliveDelaySeconds; private final long timeoutNanos; private final long delayNanos; - private final RemoteDeviceId id; - private volatile NetconfDeviceCommunicator listener; private volatile KeepaliveTask task; - public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor, final long keepaliveDelaySeconds, - final long requestTimeoutMillis) { - this.id = id; - this.salFacade = salFacade; - this.executor = requireNonNull(executor); + public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler, + final NetconfTimer timer, final long keepaliveDelaySeconds, final long requestTimeoutMillis) { + this.deviceId = requireNonNull(deviceId); + this.deviceHandler = requireNonNull(deviceHandler); + this.timer = requireNonNull(timer); this.keepaliveDelaySeconds = keepaliveDelaySeconds; delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds); timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis); } - public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ScheduledExecutorService executor) { - this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI); + public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler, + final NetconfTimer timer) { + this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI); } /** @@ -117,10 +117,10 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { } } - void reconnect() { - checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id); + private void disconnect() { + checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId); stopKeepalives(); - LOG.info("{}: Reconnecting inactive netconf session", id); + LOG.info("{}: Reconnecting inactive netconf session", deviceId); listener.disconnect(); } @@ -139,15 +139,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { throw new IllegalStateException("Unhandled " + devRpc); } - salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs, + deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs, // FIXME: wrap with keepalive services.actions())); // We have performed a callback, which might have termined keepalives final var localTask = task; if (localTask != null) { - LOG.debug("{}: Netconf session initiated, starting keepalives", id); - LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds); + LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId); + LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds); localTask.enableKeepalive(); } } @@ -155,13 +155,13 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { @Override public void onDeviceDisconnected() { stopKeepalives(); - salFacade.onDeviceDisconnected(); + deviceHandler.onDeviceDisconnected(); } @Override public void onDeviceFailed(final Throwable throwable) { stopKeepalives(); - salFacade.onDeviceFailed(throwable); + deviceHandler.onDeviceFailed(throwable); } @Override @@ -170,29 +170,33 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { if (localTask != null) { localTask.recordActivity(); } - salFacade.onNotification(domNotification); + deviceHandler.onNotification(domNotification); } @Override public void close() { stopKeepalives(); - salFacade.close(); + deviceHandler.close(); } private @NonNull ListenableFuture scheduleTimeout(final ListenableFuture invokeFuture) { final var timeout = new RequestTimeoutTask<>(invokeFuture); - final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS); - invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor()); + scheduleTimeout(invokeFuture, timeout); return timeout.userFuture; } + private void scheduleTimeout(final ListenableFuture future, final TimeoutTask timeoutTask) { + final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS); + future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor()); + } + /** * Invoke keepalive RPC and check the response. In case of any received response the keepalive * is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no * response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session * is considered inactive/failed. */ - private final class KeepaliveTask implements Runnable, FutureCallback { + private final class KeepaliveTask implements TimerTask, FutureCallback { // Keepalive RPC static resources static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap( NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER); @@ -209,7 +213,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { } @Override - public void run() { + public void run(final Timeout timeout) { final long local = lastActivity; final long now = System.nanoTime(); final long inFutureNanos = local + delayNanos - now; @@ -231,26 +235,28 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { synchronized void enableKeepalive() { recordActivity(); - if (!suppressed) { - // unscheduled -> unsuppressed - reschedule(); - } else { + if (suppressed) { // suppressed -> unsuppressed suppressed = false; + } else { + // unscheduled -> unsuppressed + reschedule(); } } private synchronized void sendKeepalive(final long now) { if (suppressed) { - LOG.debug("{}: Skipping keepalive while disabled", id); + LOG.debug("{}: Skipping keepalive while disabled", deviceId); // suppressed -> unscheduled suppressed = false; return; } - LOG.trace("{}: Invoking keepalive RPC", id); - final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD); + LOG.trace("{}: Invoking keepalive RPC", deviceId); + final var deviceFuture = devRpc.invokeNetconf(GetConfig.QNAME, KEEPALIVE_PAYLOAD); lastActivity = now; + + scheduleTimeout(deviceFuture, new TimeoutTask(deviceFuture)); Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor()); } @@ -259,8 +265,8 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { // No matter what response we got, rpc-reply or rpc-error, // we got it from device so the netconf session is OK if (result == null) { - LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id); - reconnect(); + LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId); + disconnect(); return; } @@ -269,19 +275,23 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { } else { final var errors = result.errors(); if (!errors.isEmpty()) { - LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors); + LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors); reschedule(); } else { - LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id); - reconnect(); + LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId); + disconnect(); } } } @Override public void onFailure(final Throwable throwable) { - LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable); - reconnect(); + if (throwable instanceof CancellationException) { + LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId); + } else { + LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable); + } + disconnect(); } private void reschedule() { @@ -289,7 +299,20 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { } private void reschedule(final long delay) { - executor.schedule(this, delay, TimeUnit.NANOSECONDS); + timer.newTimeout(this, delay, TimeUnit.NANOSECONDS); + } + } + + private static class TimeoutTask implements TimerTask { + private final ListenableFuture future; + + TimeoutTask(final ListenableFuture future) { + this.future = requireNonNull(future); + } + + @Override + public final void run(final Timeout timeout) { + future.cancel(true); } } @@ -297,22 +320,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not * yet finished, we cancel it. */ - private final class RequestTimeoutTask implements FutureCallback, Runnable { + private final class RequestTimeoutTask extends TimeoutTask implements FutureCallback { private final @NonNull SettableFuture userFuture = SettableFuture.create(); - private final @NonNull ListenableFuture rpcResultFuture; RequestTimeoutTask(final ListenableFuture rpcResultFuture) { - this.rpcResultFuture = requireNonNull(rpcResultFuture); + super(rpcResultFuture); + // Note: this will also wire run() to onFailure() Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor()); } - @Override - public void run() { - rpcResultFuture.cancel(true); - userFuture.cancel(false); - enableKeepalive(); - } - @Override public void onSuccess(final V result) { // No matter what response we got, @@ -323,12 +339,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler { @Override public void onFailure(final Throwable throwable) { - // User/Application RPC failed (The RPC did not reach the remote device or ...) - // FIXME: what other reasons could cause this ?) - LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable); + // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out) + if (throwable instanceof CancellationException) { + LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId); + } else { + LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable); + } userFuture.setException(throwable); // There is no point in keeping this session. Reconnect. - reconnect(); + disconnect(); } }