Introduce NetconfTimer
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / spi / KeepaliveSalFacade.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.client.mdsal.spi;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.netconf.client.mdsal.impl.NetconfBaseOps.getSourceNode;
13 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
14 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
15
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.SettableFuture;
21 import io.netty.util.Timeout;
22 import io.netty.util.TimerTask;
23 import java.util.concurrent.CancellationException;
24 import java.util.concurrent.TimeUnit;
25 import javax.xml.transform.dom.DOMSource;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.dom.api.DOMNotification;
29 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
30 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
31 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
33 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
34 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
36 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
37 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
38 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
39 import org.opendaylight.netconf.common.NetconfTimer;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.GetConfig;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.common.QName;
43 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * SalFacade proxy that invokes keepalive RPCs to prevent session shutdown from remote device
49  * and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present).
50  * The keepalive RPC is a get-config with empty filter.
51  */
52 public final class KeepaliveSalFacade implements RemoteDeviceHandler {
53     private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class);
54
55     // 2 minutes keepalive delay by default
56     private static final long DEFAULT_DELAY = TimeUnit.MINUTES.toSeconds(2);
57
58     // 1 minute transaction timeout by default
59     private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
60
61     private final RemoteDeviceHandler deviceHandler;
62     private final RemoteDeviceId deviceId;
63     private final NetconfTimer timer;
64     private final long keepaliveDelaySeconds;
65     private final long timeoutNanos;
66     private final long delayNanos;
67
68     private volatile NetconfDeviceCommunicator listener;
69     private volatile KeepaliveTask task;
70
71     public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
72             final NetconfTimer timer, final long keepaliveDelaySeconds, final long requestTimeoutMillis) {
73         this.deviceId = requireNonNull(deviceId);
74         this.deviceHandler = requireNonNull(deviceHandler);
75         this.timer = requireNonNull(timer);
76         this.keepaliveDelaySeconds = keepaliveDelaySeconds;
77         delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
78         timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
79     }
80
81     public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
82             final NetconfTimer timer) {
83         this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
84     }
85
86     /**
87      * Set the netconf session listener whenever ready.
88      *
89      * @param listener netconf session listener
90      */
91     public void setListener(final NetconfDeviceCommunicator listener) {
92         this.listener = listener;
93     }
94
95     /**
96      * Cancel current keepalive and free it.
97      */
98     private synchronized void stopKeepalives() {
99         final var localTask = task;
100         if (localTask != null) {
101             localTask.disableKeepalive();
102             task = null;
103         }
104     }
105
106     private void disableKeepalive() {
107         final var localTask = task;
108         if (localTask != null) {
109             localTask.disableKeepalive();
110         }
111     }
112
113     private void enableKeepalive() {
114         final var localTask = task;
115         if (localTask != null) {
116             localTask.enableKeepalive();
117         }
118     }
119
120     private void disconnect() {
121         checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId);
122         stopKeepalives();
123         LOG.info("{}: Reconnecting inactive netconf session", deviceId);
124         listener.disconnect();
125     }
126
127     @Override
128     public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
129             final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
130         final var devRpc = services.rpcs();
131         task = new KeepaliveTask(devRpc);
132
133         final Rpcs keepaliveRpcs;
134         if (devRpc instanceof Rpcs.Normalized normalized) {
135             keepaliveRpcs = new NormalizedKeepaliveRpcs(normalized);
136         } else if (devRpc instanceof Rpcs.Schemaless schemaless) {
137             keepaliveRpcs = new SchemalessKeepaliveRpcs(schemaless);
138         } else {
139             throw new IllegalStateException("Unhandled " + devRpc);
140         }
141
142         deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
143             // FIXME: wrap with keepalive
144             services.actions()));
145
146         // We have performed a callback, which might have termined keepalives
147         final var localTask = task;
148         if (localTask != null) {
149             LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId);
150             LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds);
151             localTask.enableKeepalive();
152         }
153     }
154
155     @Override
156     public void onDeviceDisconnected() {
157         stopKeepalives();
158         deviceHandler.onDeviceDisconnected();
159     }
160
161     @Override
162     public void onDeviceFailed(final Throwable throwable) {
163         stopKeepalives();
164         deviceHandler.onDeviceFailed(throwable);
165     }
166
167     @Override
168     public void onNotification(final DOMNotification domNotification) {
169         final var localTask = task;
170         if (localTask != null) {
171             localTask.recordActivity();
172         }
173         deviceHandler.onNotification(domNotification);
174     }
175
176     @Override
177     public void close() {
178         stopKeepalives();
179         deviceHandler.close();
180     }
181
182     private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
183         final var timeout = new RequestTimeoutTask<>(invokeFuture);
184         scheduleTimeout(invokeFuture, timeout);
185         return timeout.userFuture;
186     }
187
188     private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeoutTask) {
189         final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS);
190         future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor());
191     }
192
193     /**
194      * Invoke keepalive RPC and check the response. In case of any received response the keepalive
195      * is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no
196      * response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
197      * is considered inactive/failed.
198      */
199     private final class KeepaliveTask implements TimerTask, FutureCallback<DOMRpcResult> {
200         // Keepalive RPC static resources
201         static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
202             NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
203
204         private final Rpcs devRpc;
205
206         @GuardedBy("this")
207         private boolean suppressed = false;
208
209         private volatile long lastActivity;
210
211         KeepaliveTask(final Rpcs devRpc) {
212             this.devRpc = requireNonNull(devRpc);
213         }
214
215         @Override
216         public void run(final Timeout timeout) {
217             final long local = lastActivity;
218             final long now = System.nanoTime();
219             final long inFutureNanos = local + delayNanos - now;
220             if (inFutureNanos > 0) {
221                 reschedule(inFutureNanos);
222             } else {
223                 sendKeepalive(now);
224             }
225         }
226
227         void recordActivity() {
228             lastActivity = System.nanoTime();
229         }
230
231         synchronized void disableKeepalive() {
232             // unsuppressed -> suppressed
233             suppressed = true;
234         }
235
236         synchronized void enableKeepalive() {
237             recordActivity();
238             if (suppressed) {
239                 // suppressed -> unsuppressed
240                 suppressed = false;
241             } else {
242                 // unscheduled -> unsuppressed
243                 reschedule();
244             }
245         }
246
247         private synchronized void sendKeepalive(final long now) {
248             if (suppressed) {
249                 LOG.debug("{}: Skipping keepalive while disabled", deviceId);
250                 // suppressed -> unscheduled
251                 suppressed = false;
252                 return;
253             }
254
255             LOG.trace("{}: Invoking keepalive RPC", deviceId);
256             final var deviceFuture = devRpc.invokeNetconf(GetConfig.QNAME, KEEPALIVE_PAYLOAD);
257             lastActivity = now;
258
259             scheduleTimeout(deviceFuture, new TimeoutTask(deviceFuture));
260             Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
261         }
262
263         @Override
264         public void onSuccess(final DOMRpcResult result) {
265             // No matter what response we got, rpc-reply or rpc-error,
266             // we got it from device so the netconf session is OK
267             if (result == null) {
268                 LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
269                 disconnect();
270                 return;
271             }
272
273             if (result.value() != null) {
274                 reschedule();
275             } else {
276                 final var errors = result.errors();
277                 if (!errors.isEmpty()) {
278                     LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors);
279                     reschedule();
280                 } else {
281                     LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
282                     disconnect();
283                 }
284             }
285         }
286
287         @Override
288         public void onFailure(final Throwable throwable) {
289             if (throwable instanceof CancellationException) {
290                 LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId);
291             } else {
292                 LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable);
293             }
294             disconnect();
295         }
296
297         private void reschedule() {
298             reschedule(delayNanos);
299         }
300
301         private void reschedule(final long delay) {
302             timer.newTimeout(this, delay, TimeUnit.NANOSECONDS);
303         }
304     }
305
306     private static class TimeoutTask implements TimerTask {
307         private final ListenableFuture<?> future;
308
309         TimeoutTask(final ListenableFuture<?> future) {
310             this.future = requireNonNull(future);
311         }
312
313         @Override
314         public final void run(final Timeout timeout) {
315             future.cancel(true);
316         }
317     }
318
319     /*
320      * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
321      * yet finished, we cancel it.
322      */
323     private final class RequestTimeoutTask<V> extends TimeoutTask implements FutureCallback<V> {
324         private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
325
326         RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
327             super(rpcResultFuture);
328             // Note: this will also wire run() to onFailure()
329             Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
330         }
331
332         @Override
333         public void onSuccess(final V result) {
334             // No matter what response we got,
335             // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
336             userFuture.set(result);
337             enableKeepalive();
338         }
339
340         @Override
341         public void onFailure(final Throwable throwable) {
342             // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
343             if (throwable instanceof CancellationException) {
344                 LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId);
345             } else {
346                 LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable);
347             }
348             userFuture.setException(throwable);
349             // There is no point in keeping this session. Reconnect.
350             disconnect();
351         }
352     }
353
354     /**
355      * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
356      * invocation. Version for {@link Rpcs.Normalized}.
357      */
358     private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized {
359         private final Rpcs.Normalized delegate;
360
361         NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) {
362             this.delegate = requireNonNull(delegate);
363         }
364
365         @Override
366         public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
367             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
368             disableKeepalive();
369             return scheduleTimeout(delegate.invokeRpc(type, input));
370         }
371
372         @Override
373         public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
374             final T rpcListener) {
375             // There is no real communication with the device (yet), hence no recordActivity() or anything
376             return delegate.registerRpcListener(rpcListener);
377         }
378     }
379
380     /**
381      * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
382      * invocation. Version for {@link Rpcs.Schemaless}.
383      */
384     private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless {
385         private final Rpcs.Schemaless delegate;
386
387         SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) {
388             this.delegate = requireNonNull(delegate);
389         }
390
391         @Override
392         public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
393             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
394             disableKeepalive();
395             return scheduleTimeout(delegate.invokeNetconf(type, input));
396         }
397
398         @Override
399         public ListenableFuture<? extends DOMSource> invokeRpc(final QName type, final DOMSource input) {
400             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
401             disableKeepalive();
402             return scheduleTimeout(delegate.invokeRpc(type, input));
403         }
404     }
405 }