2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.client.mdsal.spi;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.netconf.client.mdsal.impl.NetconfBaseOps.getSourceNode;
13 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
14 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.SettableFuture;
21 import io.netty.util.Timeout;
22 import io.netty.util.TimerTask;
23 import java.util.concurrent.CancellationException;
24 import java.util.concurrent.TimeUnit;
25 import javax.xml.transform.dom.DOMSource;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.dom.api.DOMNotification;
29 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
30 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
31 import org.opendaylight.mdsal.dom.api.DOMRpcService;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
33 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
34 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
35 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
36 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
37 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
39 import org.opendaylight.netconf.client.mdsal.api.SchemalessRpcService;
40 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
41 import org.opendaylight.netconf.common.NetconfTimer;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.GetConfig;
43 import org.opendaylight.yangtools.concepts.Registration;
44 import org.opendaylight.yangtools.yang.common.QName;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * SalFacade proxy that invokes keepalive RPCs to prevent session shutdown from remote device
51 * and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present).
52 * The keepalive RPC is a get-config with empty filter.
54 public final class KeepaliveSalFacade implements RemoteDeviceHandler {
55 private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class);
57 // 2 minutes keepalive delay by default
58 private static final long DEFAULT_DELAY = TimeUnit.MINUTES.toSeconds(2);
60 // 1 minute transaction timeout by default
61 private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
63 private final RemoteDeviceHandler deviceHandler;
64 private final RemoteDeviceId deviceId;
65 private final NetconfTimer timer;
66 private final long keepaliveDelaySeconds;
67 private final long timeoutNanos;
68 private final long delayNanos;
70 private volatile NetconfDeviceCommunicator listener;
71 private volatile KeepaliveTask task;
73 public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
74 final NetconfTimer timer, final long keepaliveDelaySeconds, final long requestTimeoutMillis) {
75 this.deviceId = requireNonNull(deviceId);
76 this.deviceHandler = requireNonNull(deviceHandler);
77 this.timer = requireNonNull(timer);
78 this.keepaliveDelaySeconds = keepaliveDelaySeconds;
79 delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
80 timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
83 public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
84 final NetconfTimer timer) {
85 this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
89 * Set the netconf session listener whenever ready.
91 * @param listener netconf session listener
93 public void setListener(final NetconfDeviceCommunicator listener) {
94 this.listener = listener;
98 * Cancel current keepalive and free it.
100 private synchronized void stopKeepalives() {
101 final var localTask = task;
102 if (localTask != null) {
103 localTask.disableKeepalive();
108 private void disableKeepalive() {
109 final var localTask = task;
110 if (localTask != null) {
111 localTask.disableKeepalive();
115 private void enableKeepalive() {
116 final var localTask = task;
117 if (localTask != null) {
118 localTask.enableKeepalive();
122 private void disconnect() {
123 checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId);
125 LOG.info("{}: Reconnecting inactive netconf session", deviceId);
126 listener.disconnect();
130 public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
131 final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
132 final var devRpc = services.rpcs();
133 task = new KeepaliveTask(devRpc);
135 final Rpcs keepaliveRpcs;
136 if (devRpc instanceof Rpcs.Normalized normalized) {
137 keepaliveRpcs = new NormalizedKeepaliveRpcs(normalized);
138 } else if (devRpc instanceof Rpcs.Schemaless schemaless) {
139 keepaliveRpcs = new SchemalessKeepaliveRpcs(schemaless);
141 throw new IllegalStateException("Unhandled " + devRpc);
144 deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
145 // FIXME: wrap with keepalive
146 services.actions()));
148 // We have performed a callback, which might have termined keepalives
149 final var localTask = task;
150 if (localTask != null) {
151 LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId);
152 LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds);
153 localTask.enableKeepalive();
158 public void onDeviceDisconnected() {
160 deviceHandler.onDeviceDisconnected();
164 public void onDeviceFailed(final Throwable throwable) {
166 deviceHandler.onDeviceFailed(throwable);
170 public void onNotification(final DOMNotification domNotification) {
171 final var localTask = task;
172 if (localTask != null) {
173 localTask.recordActivity();
175 deviceHandler.onNotification(domNotification);
179 public void close() {
181 deviceHandler.close();
184 private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
185 final var timeout = new RequestTimeoutTask<>(invokeFuture);
186 scheduleTimeout(invokeFuture, timeout);
187 return timeout.userFuture;
190 private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeoutTask) {
191 final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS);
192 future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor());
196 * Invoke keepalive RPC and check the response. In case of any received response the keepalive
197 * is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no
198 * response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
199 * is considered inactive/failed.
201 private final class KeepaliveTask implements TimerTask, FutureCallback<DOMRpcResult> {
202 // Keepalive RPC static resources
203 static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
204 NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
206 private final Rpcs devRpc;
209 private boolean suppressed = false;
211 private volatile long lastActivity;
213 KeepaliveTask(final Rpcs devRpc) {
214 this.devRpc = requireNonNull(devRpc);
218 public void run(final Timeout timeout) {
219 final long local = lastActivity;
220 final long now = System.nanoTime();
221 final long inFutureNanos = local + delayNanos - now;
222 if (inFutureNanos > 0) {
223 reschedule(inFutureNanos);
229 void recordActivity() {
230 lastActivity = System.nanoTime();
233 synchronized void disableKeepalive() {
234 // unsuppressed -> suppressed
238 synchronized void enableKeepalive() {
241 // suppressed -> unsuppressed
244 // unscheduled -> unsuppressed
249 private synchronized void sendKeepalive(final long now) {
251 LOG.debug("{}: Skipping keepalive while disabled", deviceId);
252 // suppressed -> unscheduled
257 LOG.trace("{}: Invoking keepalive RPC", deviceId);
258 final var deviceFuture = devRpc.invokeNetconf(GetConfig.QNAME, KEEPALIVE_PAYLOAD);
261 scheduleTimeout(deviceFuture, new TimeoutTask(deviceFuture));
262 Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
266 public void onSuccess(final DOMRpcResult result) {
267 // No matter what response we got, rpc-reply or rpc-error,
268 // we got it from device so the netconf session is OK
269 if (result == null) {
270 LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
275 if (result.value() != null) {
278 final var errors = result.errors();
279 if (!errors.isEmpty()) {
280 LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors);
283 LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
290 public void onFailure(final Throwable throwable) {
291 if (throwable instanceof CancellationException) {
292 LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId);
294 LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable);
299 private void reschedule() {
300 reschedule(delayNanos);
303 private void reschedule(final long delay) {
304 timer.newTimeout(this, delay, TimeUnit.NANOSECONDS);
308 private static class TimeoutTask implements TimerTask {
309 private final ListenableFuture<?> future;
311 TimeoutTask(final ListenableFuture<?> future) {
312 this.future = requireNonNull(future);
316 public final void run(final Timeout timeout) {
322 * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
323 * yet finished, we cancel it.
325 private final class RequestTimeoutTask<V> extends TimeoutTask implements FutureCallback<V> {
326 private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
328 RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
329 super(rpcResultFuture);
330 // Note: this will also wire run() to onFailure()
331 Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
335 public void onSuccess(final V result) {
336 // No matter what response we got,
337 // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
338 userFuture.set(result);
343 public void onFailure(final Throwable throwable) {
344 // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
345 if (throwable instanceof CancellationException) {
346 LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId);
348 LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable);
350 userFuture.setException(throwable);
351 // There is no point in keeping this session. Reconnect.
357 * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
358 * invocation. Version for {@link Rpcs.Normalized}.
360 private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized {
361 private final @NonNull KeepaliveDOMRpcService domRpcService;
362 private final Rpcs.Normalized delegate;
364 NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) {
365 this.delegate = requireNonNull(delegate);
366 domRpcService = new KeepaliveDOMRpcService(delegate.domRpcService());
370 public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
371 // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
373 return scheduleTimeout(delegate.invokeNetconf(type, input));
377 public DOMRpcService domRpcService() {
378 return domRpcService;
382 private final class KeepaliveDOMRpcService implements DOMRpcService {
383 private final @NonNull DOMRpcService delegate;
385 KeepaliveDOMRpcService(final DOMRpcService delegate) {
386 this.delegate = requireNonNull(delegate);
390 public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
391 // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
393 return scheduleTimeout(delegate.invokeRpc(type, input));
397 public Registration registerRpcListener(final DOMRpcAvailabilityListener rpcListener) {
398 // There is no real communication with the device (yet), hence no recordActivity() or anything
399 return delegate.registerRpcListener(rpcListener);
404 * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
405 * invocation. Version for {@link Rpcs.Schemaless}.
407 private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless {
408 private final @NonNull KeepaliveSchemalessRpcService schemalessRpcService;
409 private final Rpcs.Schemaless delegate;
411 SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) {
412 this.delegate = requireNonNull(delegate);
413 schemalessRpcService = new KeepaliveSchemalessRpcService(delegate.schemalessRpcService());
417 public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
418 // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
420 return scheduleTimeout(delegate.invokeNetconf(type, input));
424 public SchemalessRpcService schemalessRpcService() {
425 return schemalessRpcService;
429 private final class KeepaliveSchemalessRpcService implements SchemalessRpcService {
430 private final SchemalessRpcService delegate;
432 KeepaliveSchemalessRpcService(final SchemalessRpcService delegate) {
433 this.delegate = requireNonNull(delegate);
437 public ListenableFuture<? extends DOMSource> invokeRpc(final QName type, final DOMSource payload) {
438 // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
440 return scheduleTimeout(delegate.invokeRpc(type, payload));