Migrate netconf-topology to new transport
[netconf.git] / apps / netconf-topology / src / main / java / org / opendaylight / netconf / topology / spi / NetconfNodeHandler.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.checkerframework.checker.lock.qual.Holding;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.mdsal.dom.api.DOMNotification;
26 import org.opendaylight.netconf.client.NetconfClientFactory;
27 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
28 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
29 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
30 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
31 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
33 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
34 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
35 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
36 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
37 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
42 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
43 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
44 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
45 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
49 import org.opendaylight.yangtools.concepts.AbstractRegistration;
50 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
51 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
52 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * All state associated with a NETCONF topology node. Each node handles its own reconnection.
58  */
59 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
60     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
61
62     private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
63     private final @NonNull NetconfClientFactory clientFactory;
64     private final @NonNull NetconfClientConfiguration clientConfig;
65     private final @NonNull NetconfDeviceCommunicator communicator;
66     private final @NonNull RemoteDeviceHandler delegate;
67     private final @NonNull ListeningScheduledExecutorService scheduledExecutor;
68     private final @NonNull RemoteDeviceId deviceId;
69
70     private final long maxAttempts;
71     private final int minSleep;
72     private final double sleepFactor;
73
74     @GuardedBy("this")
75     private long attempts;
76     @GuardedBy("this")
77     private long lastSleep;
78     @GuardedBy("this")
79     private ListenableFuture<?> currentTask;
80
81     public NetconfNodeHandler(final NetconfClientFactory clientFactory,
82             final ScheduledExecutorService scheduledExecutor, final BaseNetconfSchemas baseSchemas,
83             final SchemaResourceManager schemaManager, final Executor processingExecutor,
84             final NetconfClientConfigurationBuilderFactory builderFactory,
85             final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
86             final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
87             final NetconfNodeAugmentedOptional nodeOptional) {
88         this.clientFactory = requireNonNull(clientFactory);
89         // FIXME: do not wrap this executor
90         this.scheduledExecutor = MoreExecutors.listeningDecorator(scheduledExecutor);
91         this.delegate = requireNonNull(delegate);
92         this.deviceId = requireNonNull(deviceId);
93
94         maxAttempts = node.requireMaxConnectionAttempts().toJava();
95         minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
96         sleepFactor = node.requireSleepFactor().doubleValue();
97
98         // Setup reconnection on empty context, if so configured
99         // FIXME: NETCONF-925: implement this
100         if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
101             LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
102         }
103
104         // The facade we are going it present to NetconfDevice
105         RemoteDeviceHandler salFacade;
106         final KeepaliveSalFacade keepAliveFacade;
107         final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
108         if (keepaliveDelay > 0) {
109             LOG.info("Adding keepalive facade, for device {}", nodeId);
110             salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, scheduledExecutor, keepaliveDelay,
111                 node.requireDefaultRequestTimeoutMillis().toJava());
112         } else {
113             salFacade = this;
114             keepAliveFacade = null;
115         }
116
117         final RemoteDevice<NetconfDeviceCommunicator> device;
118         if (node.requireSchemaless()) {
119             device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
120             yanglibRegistrations = List.of();
121         } else {
122             final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
123             device = new NetconfDeviceBuilder()
124                 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
125                 .setSchemaResourcesDTO(resources)
126                 .setGlobalProcessingExecutor(processingExecutor)
127                 .setId(deviceId)
128                 .setSalFacade(salFacade)
129                 .setDeviceActionFactory(deviceActionFactory)
130                 .setBaseSchemas(baseSchemas)
131                 .build();
132             yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
133         }
134
135         final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
136         if (rpcMessageLimit < 1) {
137             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
138         }
139
140         communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
141             NetconfNodeUtils.extractUserCapabilities(node));
142
143         if (keepAliveFacade != null) {
144             keepAliveFacade.setListener(communicator);
145         }
146
147         clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
148             .withSessionListener(communicator)
149             .build();
150     }
151
152     public synchronized void connect() {
153         attempts = 1;
154         lastSleep = minSleep;
155         lockedConnect();
156     }
157
158     @Holding("this")
159     private void lockedConnect() {
160         try {
161             final var clientFuture = clientFactory.createClient(clientConfig);
162             clientFuture.addListener(() -> connectComplete(clientFuture), MoreExecutors.directExecutor());
163             currentTask = clientFuture;
164         } catch (UnsupportedConfigurationException e) {
165             onDeviceFailed(e);
166         }
167     }
168
169     private void connectComplete(final ListenableFuture<?> future) {
170         // Locked manipulation of internal state
171         synchronized (this) {
172             // A quick sanity check
173             if (currentTask != future) {
174                 LOG.warn("Ignoring connection completion, expected {} actual {}", future, currentTask);
175                 return;
176             }
177             currentTask = null;
178             // ListenableFuture provide no detail on error unless you attempt to get() the result
179             // then only the original exception is rethrown wrapped with ExecutionException
180             try {
181                 if (future.isCancelled() || future.isDone() && future.get() != null) {
182                     // Success or cancellation, nothing else to do.
183                     // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
184                     return;
185                 }
186             } catch (InterruptedException | ExecutionException e) {
187                 LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, e);
188             }
189         }
190
191         // We are invoking callbacks, do not hold locks
192         reconnectOrFail();
193     }
194
195     @Override
196     protected synchronized void removeRegistration() {
197         if (currentTask != null) {
198             currentTask.cancel(false);
199             currentTask = null;
200         }
201
202         communicator.close();
203         delegate.close();
204         yanglibRegistrations.forEach(SchemaSourceRegistration::close);
205     }
206
207     @Override
208     public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
209             final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
210         synchronized (this) {
211             attempts = 0;
212         }
213         delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
214     }
215
216     @Override
217     public void onDeviceDisconnected() {
218         delegate.onDeviceDisconnected();
219         reconnectOrFail();
220     }
221
222     @Override
223     public void onDeviceFailed(final Throwable throwable) {
224         // We have not reported onDeviceConnected(), so from the view of delete we are still connecting
225         LOG.debug("Connection attempt failed", throwable);
226         reconnectOrFail();
227     }
228
229     @Override
230     public void onNotification(final DOMNotification domNotification) {
231         delegate.onNotification(domNotification);
232     }
233
234     private void reconnectOrFail() {
235         final var ex = scheduleReconnect();
236         if (ex != null) {
237             delegate.onDeviceFailed(ex);
238         }
239     }
240
241     private synchronized Exception scheduleReconnect() {
242         if (isClosed()) {
243             return null;
244         }
245
246         final long delayMillis;
247
248         // We have exceeded the number of connection attempts
249         if (maxAttempts > 0 && attempts >= maxAttempts) {
250             LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
251             return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
252         }
253
254         // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
255         // by sleepFactor.
256         if (attempts != 0) {
257             final long nextSleep = (long) (lastSleep * sleepFactor);
258             // check for overflow
259             delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
260         } else {
261             delayMillis = minSleep;
262         }
263
264         attempts++;
265         lastSleep = delayMillis;
266         LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
267
268         // Schedule a task for the right time. We always go through the executor to eliminate the special case of
269         // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
270         // That special case makes for more code paths to test and introduces additional uncertainty whether
271         // the attempt was executed on this thread or not.
272         currentTask = scheduledExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
273         return null;
274     }
275
276     private synchronized void reconnect() {
277         currentTask = null;
278         if (notClosed()) {
279             lockedConnect();
280         }
281     }
282
283     private static List<SchemaSourceRegistration<?>> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
284             final NetconfNode node, final SchemaResourcesDTO resources) {
285         final var yangLibrary = node.getYangLibrary();
286         if (yangLibrary != null) {
287             final Uri uri = yangLibrary.getYangLibraryUrl();
288             if (uri != null) {
289                 final var registrations = new ArrayList<SchemaSourceRegistration<?>>();
290                 final var yangLibURL = uri.getValue();
291                 final var schemaRegistry = resources.getSchemaRegistry();
292
293                 // pre register yang library sources as fallback schemas to schema registry
294                 final var yangLibUsername = yangLibrary.getUsername();
295                 final var yangLigPassword = yangLibrary.getPassword();
296                 final var schemas = yangLibUsername != null && yangLigPassword != null
297                     ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
298                         : LibraryModulesSchemas.create(yangLibURL);
299
300                 for (var entry : schemas.getAvailableModels().entrySet()) {
301                     registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
302                         remoteDeviceId, schemas.getAvailableModels()),
303                         PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
304                             PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
305                 }
306                 return List.copyOf(registrations);
307             }
308         }
309
310         return List.of();
311     }
312
313     @VisibleForTesting
314     synchronized long attempts() {
315         return attempts;
316     }
317 }