Bump upstreams
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / spi / KeepaliveSalFacade.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.client.mdsal.spi;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.netconf.client.mdsal.impl.NetconfBaseOps.getSourceNode;
13 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
14 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
15
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.SettableFuture;
21 import io.netty.util.Timeout;
22 import io.netty.util.TimerTask;
23 import java.util.concurrent.CancellationException;
24 import java.util.concurrent.TimeUnit;
25 import javax.xml.transform.dom.DOMSource;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.dom.api.DOMNotification;
29 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
30 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
31 import org.opendaylight.mdsal.dom.api.DOMRpcService;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
33 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
34 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
35 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
36 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
37 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
39 import org.opendaylight.netconf.client.mdsal.api.SchemalessRpcService;
40 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
41 import org.opendaylight.netconf.common.NetconfTimer;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.GetConfig;
43 import org.opendaylight.yangtools.concepts.Registration;
44 import org.opendaylight.yangtools.yang.common.QName;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * SalFacade proxy that invokes keepalive RPCs to prevent session shutdown from remote device
51  * and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present).
52  * The keepalive RPC is a get-config with empty filter.
53  */
54 public final class KeepaliveSalFacade implements RemoteDeviceHandler {
55     private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class);
56
57     // 2 minutes keepalive delay by default
58     private static final long DEFAULT_DELAY = TimeUnit.MINUTES.toSeconds(2);
59
60     // 1 minute transaction timeout by default
61     private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
62
63     private final RemoteDeviceHandler deviceHandler;
64     private final RemoteDeviceId deviceId;
65     private final NetconfTimer timer;
66     private final long keepaliveDelaySeconds;
67     private final long timeoutNanos;
68     private final long delayNanos;
69
70     private volatile NetconfDeviceCommunicator listener;
71     private volatile KeepaliveTask task;
72
73     public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
74             final NetconfTimer timer, final long keepaliveDelaySeconds, final long requestTimeoutMillis) {
75         this.deviceId = requireNonNull(deviceId);
76         this.deviceHandler = requireNonNull(deviceHandler);
77         this.timer = requireNonNull(timer);
78         this.keepaliveDelaySeconds = keepaliveDelaySeconds;
79         delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
80         timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
81     }
82
83     public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
84             final NetconfTimer timer) {
85         this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
86     }
87
88     /**
89      * Set the netconf session listener whenever ready.
90      *
91      * @param listener netconf session listener
92      */
93     public void setListener(final NetconfDeviceCommunicator listener) {
94         this.listener = listener;
95     }
96
97     /**
98      * Cancel current keepalive and free it.
99      */
100     private synchronized void stopKeepalives() {
101         final var localTask = task;
102         if (localTask != null) {
103             localTask.disableKeepalive();
104             task = null;
105         }
106     }
107
108     private void disableKeepalive() {
109         final var localTask = task;
110         if (localTask != null) {
111             localTask.disableKeepalive();
112         }
113     }
114
115     private void enableKeepalive() {
116         final var localTask = task;
117         if (localTask != null) {
118             localTask.enableKeepalive();
119         }
120     }
121
122     private void disconnect() {
123         checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId);
124         stopKeepalives();
125         LOG.info("{}: Reconnecting inactive netconf session", deviceId);
126         listener.disconnect();
127     }
128
129     @Override
130     public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
131             final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
132         final var devRpc = services.rpcs();
133         task = new KeepaliveTask(devRpc);
134
135         final Rpcs keepaliveRpcs;
136         if (devRpc instanceof Rpcs.Normalized normalized) {
137             keepaliveRpcs = new NormalizedKeepaliveRpcs(normalized);
138         } else if (devRpc instanceof Rpcs.Schemaless schemaless) {
139             keepaliveRpcs = new SchemalessKeepaliveRpcs(schemaless);
140         } else {
141             throw new IllegalStateException("Unhandled " + devRpc);
142         }
143
144         deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
145             // FIXME: wrap with keepalive
146             services.actions()));
147
148         // We have performed a callback, which might have termined keepalives
149         final var localTask = task;
150         if (localTask != null) {
151             LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId);
152             LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds);
153             localTask.enableKeepalive();
154         }
155     }
156
157     @Override
158     public void onDeviceDisconnected() {
159         stopKeepalives();
160         deviceHandler.onDeviceDisconnected();
161     }
162
163     @Override
164     public void onDeviceFailed(final Throwable throwable) {
165         stopKeepalives();
166         deviceHandler.onDeviceFailed(throwable);
167     }
168
169     @Override
170     public void onNotification(final DOMNotification domNotification) {
171         final var localTask = task;
172         if (localTask != null) {
173             localTask.recordActivity();
174         }
175         deviceHandler.onNotification(domNotification);
176     }
177
178     @Override
179     public void close() {
180         stopKeepalives();
181         deviceHandler.close();
182     }
183
184     private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
185         final var timeout = new RequestTimeoutTask<>(invokeFuture);
186         scheduleTimeout(invokeFuture, timeout);
187         return timeout.userFuture;
188     }
189
190     private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeoutTask) {
191         final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS);
192         future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor());
193     }
194
195     /**
196      * Invoke keepalive RPC and check the response. In case of any received response the keepalive
197      * is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no
198      * response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
199      * is considered inactive/failed.
200      */
201     private final class KeepaliveTask implements TimerTask, FutureCallback<DOMRpcResult> {
202         // Keepalive RPC static resources
203         static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
204             NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
205
206         private final Rpcs devRpc;
207
208         @GuardedBy("this")
209         private boolean suppressed = false;
210
211         private volatile long lastActivity;
212
213         KeepaliveTask(final Rpcs devRpc) {
214             this.devRpc = requireNonNull(devRpc);
215         }
216
217         @Override
218         public void run(final Timeout timeout) {
219             final long local = lastActivity;
220             final long now = System.nanoTime();
221             final long inFutureNanos = local + delayNanos - now;
222             if (inFutureNanos > 0) {
223                 reschedule(inFutureNanos);
224             } else {
225                 sendKeepalive(now);
226             }
227         }
228
229         void recordActivity() {
230             lastActivity = System.nanoTime();
231         }
232
233         synchronized void disableKeepalive() {
234             // unsuppressed -> suppressed
235             suppressed = true;
236         }
237
238         synchronized void enableKeepalive() {
239             recordActivity();
240             if (suppressed) {
241                 // suppressed -> unsuppressed
242                 suppressed = false;
243             } else {
244                 // unscheduled -> unsuppressed
245                 reschedule();
246             }
247         }
248
249         private synchronized void sendKeepalive(final long now) {
250             if (suppressed) {
251                 LOG.debug("{}: Skipping keepalive while disabled", deviceId);
252                 // suppressed -> unscheduled
253                 suppressed = false;
254                 return;
255             }
256
257             LOG.trace("{}: Invoking keepalive RPC", deviceId);
258             final var deviceFuture = devRpc.invokeNetconf(GetConfig.QNAME, KEEPALIVE_PAYLOAD);
259             lastActivity = now;
260
261             scheduleTimeout(deviceFuture, new TimeoutTask(deviceFuture));
262             Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
263         }
264
265         @Override
266         public void onSuccess(final DOMRpcResult result) {
267             // No matter what response we got, rpc-reply or rpc-error,
268             // we got it from device so the netconf session is OK
269             if (result == null) {
270                 LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
271                 disconnect();
272                 return;
273             }
274
275             if (result.value() != null) {
276                 reschedule();
277             } else {
278                 final var errors = result.errors();
279                 if (!errors.isEmpty()) {
280                     LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors);
281                     reschedule();
282                 } else {
283                     LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
284                     disconnect();
285                 }
286             }
287         }
288
289         @Override
290         public void onFailure(final Throwable throwable) {
291             if (throwable instanceof CancellationException) {
292                 LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId);
293             } else {
294                 LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable);
295             }
296             disconnect();
297         }
298
299         private void reschedule() {
300             reschedule(delayNanos);
301         }
302
303         private void reschedule(final long delay) {
304             timer.newTimeout(this, delay, TimeUnit.NANOSECONDS);
305         }
306     }
307
308     private static class TimeoutTask implements TimerTask {
309         private final ListenableFuture<?> future;
310
311         TimeoutTask(final ListenableFuture<?> future) {
312             this.future = requireNonNull(future);
313         }
314
315         @Override
316         public final void run(final Timeout timeout) {
317             future.cancel(true);
318         }
319     }
320
321     /*
322      * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
323      * yet finished, we cancel it.
324      */
325     private final class RequestTimeoutTask<V> extends TimeoutTask implements FutureCallback<V> {
326         private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
327
328         RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
329             super(rpcResultFuture);
330             // Note: this will also wire run() to onFailure()
331             Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
332         }
333
334         @Override
335         public void onSuccess(final V result) {
336             // No matter what response we got,
337             // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
338             userFuture.set(result);
339             enableKeepalive();
340         }
341
342         @Override
343         public void onFailure(final Throwable throwable) {
344             // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
345             if (throwable instanceof CancellationException) {
346                 LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId);
347             } else {
348                 LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable);
349             }
350             userFuture.setException(throwable);
351             // There is no point in keeping this session. Reconnect.
352             disconnect();
353         }
354     }
355
356     /**
357      * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
358      * invocation. Version for {@link Rpcs.Normalized}.
359      */
360     private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized {
361         private final @NonNull KeepaliveDOMRpcService domRpcService;
362         private final Rpcs.Normalized delegate;
363
364         NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) {
365             this.delegate = requireNonNull(delegate);
366             domRpcService = new KeepaliveDOMRpcService(delegate.domRpcService());
367         }
368
369         @Override
370         public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
371             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
372             disableKeepalive();
373             return scheduleTimeout(delegate.invokeNetconf(type, input));
374         }
375
376         @Override
377         public DOMRpcService domRpcService() {
378             return domRpcService;
379         }
380     }
381
382     private final class KeepaliveDOMRpcService implements DOMRpcService {
383         private final @NonNull DOMRpcService delegate;
384
385         KeepaliveDOMRpcService(final DOMRpcService delegate) {
386             this.delegate = requireNonNull(delegate);
387         }
388
389         @Override
390         public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
391             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
392             disableKeepalive();
393             return scheduleTimeout(delegate.invokeRpc(type, input));
394         }
395
396         @Override
397         public Registration registerRpcListener(final DOMRpcAvailabilityListener rpcListener) {
398             // There is no real communication with the device (yet), hence no recordActivity() or anything
399             return delegate.registerRpcListener(rpcListener);
400         }
401     }
402
403     /**
404      * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
405      * invocation. Version for {@link Rpcs.Schemaless}.
406      */
407     private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless {
408         private final @NonNull KeepaliveSchemalessRpcService schemalessRpcService;
409         private final Rpcs.Schemaless delegate;
410
411         SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) {
412             this.delegate = requireNonNull(delegate);
413             schemalessRpcService = new KeepaliveSchemalessRpcService(delegate.schemalessRpcService());
414         }
415
416         @Override
417         public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
418             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
419             disableKeepalive();
420             return scheduleTimeout(delegate.invokeNetconf(type, input));
421         }
422
423         @Override
424         public SchemalessRpcService schemalessRpcService() {
425             return schemalessRpcService;
426         }
427     }
428
429     private final class KeepaliveSchemalessRpcService implements SchemalessRpcService {
430         private final SchemalessRpcService delegate;
431
432         KeepaliveSchemalessRpcService(final SchemalessRpcService delegate) {
433             this.delegate = requireNonNull(delegate);
434         }
435
436         @Override
437         public ListenableFuture<? extends DOMSource> invokeRpc(final QName type, final DOMSource payload) {
438             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
439             disableKeepalive();
440             return scheduleTimeout(delegate.invokeRpc(type, payload));
441         }
442     }
443 }