b3945a4536c5dcd46e561758f60c8ef6ffa1c5a5
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / spi / KeepaliveSalFacade.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.client.mdsal.spi;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.netconf.client.mdsal.impl.NetconfBaseOps.getSourceNode;
13 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
14 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
15 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
16
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import javax.xml.transform.dom.DOMSource;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.mdsal.dom.api.DOMNotification;
28 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
29 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
30 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
31 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
32 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
33 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
34 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
35 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
36 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
37 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * SalFacade proxy that invokes keepalive RPCs to prevent session shutdown from remote device
46  * and to detect incorrect session drops (netconf session is inactive, but TCP/SSH connection is still present).
47  * The keepalive RPC is a get-config with empty filter.
48  */
49 public final class KeepaliveSalFacade implements RemoteDeviceHandler {
50     private static final Logger LOG = LoggerFactory.getLogger(KeepaliveSalFacade.class);
51
52     // 2 minutes keepalive delay by default
53     private static final long DEFAULT_DELAY = TimeUnit.MINUTES.toSeconds(2);
54
55     // 1 minute transaction timeout by default
56     private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
57
58     private final RemoteDeviceHandler salFacade;
59     private final ScheduledExecutorService executor;
60
61     private final long keepaliveDelaySeconds;
62     private final long timeoutNanos;
63     private final long delayNanos;
64
65     private final RemoteDeviceId id;
66
67     private volatile NetconfDeviceCommunicator listener;
68     private volatile KeepaliveTask task;
69
70     public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
71             final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
72             final long requestTimeoutMillis) {
73         this.id = id;
74         this.salFacade = salFacade;
75         this.executor = requireNonNull(executor);
76         this.keepaliveDelaySeconds = keepaliveDelaySeconds;
77         delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
78         timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
79     }
80
81     public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
82             final ScheduledExecutorService executor) {
83         this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
84     }
85
86     /**
87      * Set the netconf session listener whenever ready.
88      *
89      * @param listener netconf session listener
90      */
91     public void setListener(final NetconfDeviceCommunicator listener) {
92         this.listener = listener;
93     }
94
95     /**
96      * Cancel current keepalive and free it.
97      */
98     private synchronized void stopKeepalives() {
99         final var localTask = task;
100         if (localTask != null) {
101             localTask.disableKeepalive();
102             task = null;
103         }
104     }
105
106     private void disableKeepalive() {
107         final var localTask = task;
108         if (localTask != null) {
109             localTask.disableKeepalive();
110         }
111     }
112
113     private void enableKeepalive() {
114         final var localTask = task;
115         if (localTask != null) {
116             localTask.enableKeepalive();
117         }
118     }
119
120     void reconnect() {
121         checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
122         stopKeepalives();
123         LOG.info("{}: Reconnecting inactive netconf session", id);
124         listener.disconnect();
125     }
126
127     @Override
128     public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
129             final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
130         final var devRpc = services.rpcs();
131         task = new KeepaliveTask(devRpc);
132
133         final Rpcs keepaliveRpcs;
134         if (devRpc instanceof Rpcs.Normalized normalized) {
135             keepaliveRpcs = new NormalizedKeepaliveRpcs(normalized);
136         } else if (devRpc instanceof Rpcs.Schemaless schemaless) {
137             keepaliveRpcs = new SchemalessKeepaliveRpcs(schemaless);
138         } else {
139             throw new IllegalStateException("Unhandled " + devRpc);
140         }
141
142         salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
143             // FIXME: wrap with keepalive
144             services.actions()));
145
146         // We have performed a callback, which might have termined keepalives
147         final var localTask = task;
148         if (localTask != null) {
149             LOG.debug("{}: Netconf session initiated, starting keepalives", id);
150             LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
151             localTask.enableKeepalive();
152         }
153     }
154
155     @Override
156     public void onDeviceDisconnected() {
157         stopKeepalives();
158         salFacade.onDeviceDisconnected();
159     }
160
161     @Override
162     public void onDeviceFailed(final Throwable throwable) {
163         stopKeepalives();
164         salFacade.onDeviceFailed(throwable);
165     }
166
167     @Override
168     public void onNotification(final DOMNotification domNotification) {
169         final var localTask = task;
170         if (localTask != null) {
171             localTask.recordActivity();
172         }
173         salFacade.onNotification(domNotification);
174     }
175
176     @Override
177     public void close() {
178         stopKeepalives();
179         salFacade.close();
180     }
181
182     private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
183         final var timeout = new RequestTimeoutTask<>(invokeFuture);
184         final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
185         invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
186         return timeout.userFuture;
187     }
188
189     /**
190      * Invoke keepalive RPC and check the response. In case of any received response the keepalive
191      * is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no
192      * response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
193      * is considered inactive/failed.
194      */
195     private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
196         // Keepalive RPC static resources
197         static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
198             NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
199
200         private final Rpcs devRpc;
201
202         @GuardedBy("this")
203         private boolean suppressed = false;
204
205         private volatile long lastActivity;
206
207         KeepaliveTask(final Rpcs devRpc) {
208             this.devRpc = requireNonNull(devRpc);
209         }
210
211         @Override
212         public void run() {
213             final long local = lastActivity;
214             final long now = System.nanoTime();
215             final long inFutureNanos = local + delayNanos - now;
216             if (inFutureNanos > 0) {
217                 reschedule(inFutureNanos);
218             } else {
219                 sendKeepalive(now);
220             }
221         }
222
223         void recordActivity() {
224             lastActivity = System.nanoTime();
225         }
226
227         synchronized void disableKeepalive() {
228             // unsuppressed -> suppressed
229             suppressed = true;
230         }
231
232         synchronized void enableKeepalive() {
233             recordActivity();
234             if (suppressed) {
235                 // suppressed -> unsuppressed
236                 suppressed = false;
237             } else {
238                 // unscheduled -> unsuppressed
239                 reschedule();
240             }
241         }
242
243         private synchronized void sendKeepalive(final long now) {
244             if (suppressed) {
245                 LOG.debug("{}: Skipping keepalive while disabled", id);
246                 // suppressed -> unscheduled
247                 suppressed = false;
248                 return;
249             }
250
251             LOG.trace("{}: Invoking keepalive RPC", id);
252             final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
253             lastActivity = now;
254             Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
255         }
256
257         @Override
258         public void onSuccess(final DOMRpcResult result) {
259             // No matter what response we got, rpc-reply or rpc-error,
260             // we got it from device so the netconf session is OK
261             if (result == null) {
262                 LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
263                 reconnect();
264                 return;
265             }
266
267             if (result.value() != null) {
268                 reschedule();
269             } else {
270                 final var errors = result.errors();
271                 if (!errors.isEmpty()) {
272                     LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
273                     reschedule();
274                 } else {
275                     LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
276                     reconnect();
277                 }
278             }
279         }
280
281         @Override
282         public void onFailure(final Throwable throwable) {
283             LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
284             reconnect();
285         }
286
287         private void reschedule() {
288             reschedule(delayNanos);
289         }
290
291         private void reschedule(final long delay) {
292             executor.schedule(this, delay, TimeUnit.NANOSECONDS);
293         }
294     }
295
296     /*
297      * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
298      * yet finished, we cancel it.
299      */
300     private final class RequestTimeoutTask<V> implements FutureCallback<V>, Runnable {
301         private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
302         private final @NonNull ListenableFuture<? extends V> rpcResultFuture;
303
304         RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
305             this.rpcResultFuture = requireNonNull(rpcResultFuture);
306             Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
307         }
308
309         @Override
310         public void run() {
311             // Note: this will loop to onFailure()
312             rpcResultFuture.cancel(true);
313         }
314
315         @Override
316         public void onSuccess(final V result) {
317             // No matter what response we got,
318             // rpc-reply or rpc-error, we got it from device so the netconf session is OK.
319             userFuture.set(result);
320             enableKeepalive();
321         }
322
323         @Override
324         public void onFailure(final Throwable throwable) {
325             // User/Application RPC failed (The RPC did not reach the remote device or ...)
326             // FIXME: what other reasons could cause this ?)
327             LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
328             userFuture.setException(throwable);
329             // There is no point in keeping this session. Reconnect.
330             reconnect();
331         }
332     }
333
334     /**
335      * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
336      * invocation. Version for {@link Rpcs.Normalized}.
337      */
338     private final class NormalizedKeepaliveRpcs implements Rpcs.Normalized {
339         private final Rpcs.Normalized delegate;
340
341         NormalizedKeepaliveRpcs(final Rpcs.Normalized delegate) {
342             this.delegate = requireNonNull(delegate);
343         }
344
345         @Override
346         public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
347             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
348             disableKeepalive();
349             return scheduleTimeout(delegate.invokeRpc(type, input));
350         }
351
352         @Override
353         public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
354             final T rpcListener) {
355             // There is no real communication with the device (yet), hence no recordActivity() or anything
356             return delegate.registerRpcListener(rpcListener);
357         }
358     }
359
360     /**
361      * Proxy for {@link Rpcs} which attaches a reset-keepalive-task and schedule request-timeout-task to each RPC
362      * invocation. Version for {@link Rpcs.Schemaless}.
363      */
364     private final class SchemalessKeepaliveRpcs implements Rpcs.Schemaless {
365         private final Rpcs.Schemaless delegate;
366
367         SchemalessKeepaliveRpcs(final Rpcs.Schemaless delegate) {
368             this.delegate = requireNonNull(delegate);
369         }
370
371         @Override
372         public ListenableFuture<? extends DOMRpcResult> invokeNetconf(final QName type, final ContainerNode input) {
373             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
374             disableKeepalive();
375             return scheduleTimeout(delegate.invokeNetconf(type, input));
376         }
377
378         @Override
379         public ListenableFuture<? extends DOMSource> invokeRpc(final QName type, final DOMSource input) {
380             // FIXME: what happens if we disable keepalive and then invokeRpc() throws?
381             disableKeepalive();
382             return scheduleTimeout(delegate.invokeRpc(type, input));
383         }
384     }
385 }