204681a41cc79df05865511258a8a6c99138653f
[netconf.git] / apps / netconf-topology / src / main / java / org / opendaylight / netconf / topology / spi / NetconfNodeHandler.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.util.concurrent.EventExecutor;
14 import io.netty.util.concurrent.Future;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.checkerframework.checker.lock.qual.Holding;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.mdsal.dom.api.DOMNotification;
25 import org.opendaylight.netconf.client.NetconfClientDispatcher;
26 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
27 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
28 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
29 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
30 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
31 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
33 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
34 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
35 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
36 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
37 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
41 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
42 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yangtools.concepts.AbstractRegistration;
48 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
49 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
50 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * All state associated with a NETCONF topology node. Each node handles its own reconnection.
56  */
57 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
58     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
59
60     private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
61     private final @NonNull NetconfClientDispatcher clientDispatcher;
62     private final @NonNull NetconfClientConfiguration clientConfig;
63     private final @NonNull NetconfDeviceCommunicator communicator;
64     private final @NonNull RemoteDeviceHandler delegate;
65     private final @NonNull EventExecutor eventExecutor;
66     private final @NonNull RemoteDeviceId deviceId;
67
68     private final long maxAttempts;
69     private final int minSleep;
70     private final double sleepFactor;
71
72     @GuardedBy("this")
73     private long attempts;
74     @GuardedBy("this")
75     private long lastSleep;
76     @GuardedBy("this")
77     private Future<?> currentTask;
78
79     public NetconfNodeHandler(final NetconfClientDispatcher clientDispatcher, final EventExecutor eventExecutor,
80             final ScheduledExecutorService keepaliveExecutor, final BaseNetconfSchemas baseSchemas,
81             final SchemaResourceManager schemaManager, final Executor processingExecutor,
82             final NetconfClientConfigurationBuilderFactory builderFactory,
83             final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
84             final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
85             final NetconfNodeAugmentedOptional nodeOptional) {
86         this.clientDispatcher = requireNonNull(clientDispatcher);
87         this.eventExecutor = requireNonNull(eventExecutor);
88         this.delegate = requireNonNull(delegate);
89         this.deviceId = requireNonNull(deviceId);
90
91         maxAttempts = node.requireMaxConnectionAttempts().toJava();
92         minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
93         sleepFactor = node.requireSleepFactor().doubleValue();
94
95         // Setup reconnection on empty context, if so configured
96         // FIXME: NETCONF-925: implement this
97         if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
98             LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
99         }
100
101         // The facade we are going it present to NetconfDevice
102         RemoteDeviceHandler salFacade;
103         final KeepaliveSalFacade keepAliveFacade;
104         final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
105         if (keepaliveDelay > 0) {
106             LOG.info("Adding keepalive facade, for device {}", nodeId);
107             salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, keepaliveExecutor, keepaliveDelay,
108                 node.requireDefaultRequestTimeoutMillis().toJava());
109         } else {
110             salFacade = this;
111             keepAliveFacade = null;
112         }
113
114         final RemoteDevice<NetconfDeviceCommunicator> device;
115         if (node.requireSchemaless()) {
116             device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
117             yanglibRegistrations = List.of();
118         } else {
119             final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
120             device = new NetconfDeviceBuilder()
121                 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
122                 .setSchemaResourcesDTO(resources)
123                 .setGlobalProcessingExecutor(processingExecutor)
124                 .setId(deviceId)
125                 .setSalFacade(salFacade)
126                 .setDeviceActionFactory(deviceActionFactory)
127                 .setBaseSchemas(baseSchemas)
128                 .build();
129             yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
130         }
131
132         final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
133         if (rpcMessageLimit < 1) {
134             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
135         }
136
137         communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
138             NetconfNodeUtils.extractUserCapabilities(node));
139
140         if (keepAliveFacade != null) {
141             keepAliveFacade.setListener(communicator);
142         }
143
144         clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
145             .withSessionListener(communicator)
146             .build();
147     }
148
149     public synchronized void connect() {
150         attempts = 1;
151         lastSleep = minSleep;
152         lockedConnect();
153     }
154
155     @Holding("this")
156     private void lockedConnect() {
157         currentTask = clientDispatcher.createClient(clientConfig);
158         currentTask.addListener(this::connectComplete);
159     }
160
161     private void connectComplete(final Future<?> future) {
162         final Throwable cause;
163
164         // Locked manipulation of internal state
165         synchronized (this) {
166             // A quick sanity check
167             if (currentTask != future) {
168                 LOG.warn("Ignoring connection completion, expected {} actual {}", future, currentTask);
169                 return;
170             }
171
172             currentTask = null;
173             cause = future.cause();
174             if (cause == null || cause instanceof CancellationException) {
175                 // Success or cancellation, nothing else to do.
176                 // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
177                 return;
178             }
179
180             LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
181         }
182
183         // We are invoking callbacks, do not hold locks
184         onDeviceFailed(cause);
185     }
186
187     @Override
188     protected synchronized void removeRegistration() {
189         if (currentTask != null) {
190             currentTask.cancel(false);
191             currentTask = null;
192         }
193
194         communicator.close();
195         delegate.close();
196         yanglibRegistrations.forEach(SchemaSourceRegistration::close);
197     }
198
199     @Override
200     public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
201             final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
202         synchronized (this) {
203             attempts = 0;
204         }
205         delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
206     }
207
208     @Override
209     public void onDeviceDisconnected() {
210         delegate.onDeviceDisconnected();
211         scheduleReconnect();
212     }
213
214     @Override
215     public void onDeviceFailed(final Throwable throwable) {
216         LOG.debug("Connection attempt failed", throwable);
217         delegate.onDeviceFailed(throwable);
218         scheduleReconnect();
219     }
220
221     @Override
222     public void onNotification(final DOMNotification domNotification) {
223         delegate.onNotification(domNotification);
224     }
225
226     private synchronized void scheduleReconnect() {
227         if (isClosed()) {
228             return;
229         }
230
231         final long delayMillis;
232
233         // We have exceeded the number of connection attempts
234         if (maxAttempts > 0 && attempts >= maxAttempts) {
235             LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
236             return;
237         }
238
239         // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
240         // by sleepFactor.
241         if (attempts != 0) {
242             final long nextSleep = (long) (lastSleep * sleepFactor);
243             // check for overflow
244             delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
245         } else {
246             delayMillis = minSleep;
247         }
248
249         attempts++;
250         lastSleep = delayMillis;
251         LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
252
253         // If we are not sleeping at all, return an already-succeeded future
254         if (delayMillis == 0) {
255             lockedConnect();
256             return;
257         }
258
259         // Schedule a task for the right time. It will also clear the flag.
260         currentTask = eventExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
261     }
262
263     private synchronized void reconnect() {
264         currentTask = null;
265         if (notClosed()) {
266             lockedConnect();
267         }
268     }
269
270     private static List<SchemaSourceRegistration<?>> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
271             final NetconfNode node, final SchemaResourcesDTO resources) {
272         final var yangLibrary = node.getYangLibrary();
273         if (yangLibrary != null) {
274             final Uri uri = yangLibrary.getYangLibraryUrl();
275             if (uri != null) {
276                 final var registrations = new ArrayList<SchemaSourceRegistration<?>>();
277                 final var yangLibURL = uri.getValue();
278                 final var schemaRegistry = resources.getSchemaRegistry();
279
280                 // pre register yang library sources as fallback schemas to schema registry
281                 final var yangLibUsername = yangLibrary.getUsername();
282                 final var yangLigPassword = yangLibrary.getPassword();
283                 final var schemas = yangLibUsername != null && yangLigPassword != null
284                     ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
285                         : LibraryModulesSchemas.create(yangLibURL);
286
287                 for (var entry : schemas.getAvailableModels().entrySet()) {
288                     registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
289                         remoteDeviceId, schemas.getAvailableModels()),
290                         PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
291                             PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
292                 }
293                 return List.copyOf(registrations);
294             }
295         }
296
297         return List.of();
298     }
299
300     @VisibleForTesting
301     synchronized long attempts() {
302         return attempts;
303     }
304 }