Bump upstreams
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / sal / AbstractNetconfDataTreeService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.connect.netconf.sal;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Optional;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
25 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
26 import org.opendaylight.netconf.api.ModifyAction;
27 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
28 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs;
29 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
30 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
31 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback;
32 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
33 import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext;
34 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
35 import org.opendaylight.yangtools.yang.common.RpcError;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public abstract class AbstractNetconfDataTreeService implements NetconfDataTreeService {
43     private static final class Candidate extends AbstractNetconfDataTreeService {
44         Candidate(final RemoteDeviceId id, final NetconfBaseOps netconfOps, final boolean rollbackSupport) {
45             super(id, netconfOps, rollbackSupport);
46         }
47
48         /**
49          * This has to be non blocking since it is called from a callback on commit and it is netty threadpool that is
50          * really sensitive to blocking calls.
51          */
52         @Override
53         public ListenableFuture<? extends DOMRpcResult> discardChanges() {
54             return netconfOps.discardChanges(new NetconfRpcFutureCallback("Discard candidate", id));
55         }
56
57         @Override
58         ListenableFuture<? extends DOMRpcResult> lockSingle() {
59             return netconfOps.lockCandidate(new NetconfRpcFutureCallback("Lock candidate", id));
60         }
61
62         @Override
63         List<ListenableFuture<? extends DOMRpcResult>> unlockImpl() {
64             return List.of(netconfOps.unlockCandidate(new NetconfRpcFutureCallback("Unlock candidate", id)));
65         }
66
67         @Override
68         ListenableFuture<? extends DOMRpcResult> editConfig(final DataContainerChild editStructure,
69                 final ModifyAction defaultOperation) {
70             final NetconfRpcFutureCallback callback = new NetconfRpcFutureCallback("Edit candidate", id);
71             return defaultOperation == null ? netconfOps.editConfigCandidate(callback, editStructure, rollbackSupport)
72                 : netconfOps.editConfigCandidate(callback, editStructure, defaultOperation, rollbackSupport);
73         }
74     }
75
76     private static final class Running extends AbstractNetconfDataTreeService {
77         Running(final RemoteDeviceId id, final NetconfBaseOps netconfOps, final boolean rollbackSupport) {
78             super(id, netconfOps, rollbackSupport);
79         }
80
81         @Override
82         public ListenableFuture<DOMRpcResult> discardChanges() {
83             // Changes cannot be discarded from running
84             return RPC_SUCCESS;
85         }
86
87         @Override
88         public ListenableFuture<DOMRpcResult> commit() {
89             // No candidate, hence we commit immediately
90             return RPC_SUCCESS;
91         }
92
93         @Override
94         ListenableFuture<? extends DOMRpcResult> lockSingle() {
95             return netconfOps.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
96         }
97
98         @Override
99         List<ListenableFuture<? extends DOMRpcResult>> unlockImpl() {
100             return List.of(netconfOps.unlockRunning(new NetconfRpcFutureCallback("Unlock running", id)));
101         }
102
103         @Override
104         ListenableFuture<? extends DOMRpcResult> editConfig(final DataContainerChild editStructure,
105                 final ModifyAction defaultOperation) {
106             final NetconfRpcFutureCallback callback = new NetconfRpcFutureCallback("Edit running", id);
107             return defaultOperation == null ? netconfOps.editConfigRunning(callback, editStructure, rollbackSupport)
108                 : netconfOps.editConfigRunning(callback, editStructure, defaultOperation, rollbackSupport);
109         }
110     }
111
112     private static final class CandidateWithRunning extends AbstractNetconfDataTreeService {
113         private final Candidate candidate;
114         private final Running running;
115
116         CandidateWithRunning(final RemoteDeviceId id, final NetconfBaseOps netconfOps,
117                 final boolean rollbackSupport) {
118             super(id, netconfOps, rollbackSupport);
119             candidate = new Candidate(id, netconfOps, rollbackSupport);
120             running = new Running(id, netconfOps, rollbackSupport);
121         }
122
123         @Override
124         public ListenableFuture<? extends DOMRpcResult> discardChanges() {
125             return candidate.discardChanges();
126         }
127
128         @Override
129         ListenableFuture<DOMRpcResult> lockSingle() {
130             throw new UnsupportedOperationException();
131         }
132
133         @Override
134         List<ListenableFuture<? extends DOMRpcResult>> lockImpl() {
135             return List.of(candidate.lockSingle(), running.lockSingle());
136         }
137
138         @Override
139         List<ListenableFuture<? extends DOMRpcResult>> unlockImpl() {
140             return List.of(running.unlock(), candidate.unlock());
141         }
142
143         @Override
144         ListenableFuture<? extends DOMRpcResult> editConfig(final DataContainerChild editStructure,
145                 final ModifyAction defaultOperation) {
146             return candidate.editConfig(editStructure, defaultOperation);
147         }
148     }
149
150     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfDataTreeService.class);
151     private static final ListenableFuture<DOMRpcResult> RPC_SUCCESS =
152         Futures.immediateFuture(new DefaultDOMRpcResult());
153
154     final @NonNull RemoteDeviceId id;
155     final NetconfBaseOps netconfOps;
156     final boolean rollbackSupport;
157
158     // FIXME: what do we do with locks acquired before this got flipped?
159     private volatile boolean isLockAllowed = true;
160
161     AbstractNetconfDataTreeService(final RemoteDeviceId id, final NetconfBaseOps netconfOps,
162             final boolean rollbackSupport) {
163         this.id = requireNonNull(id);
164         this.netconfOps = requireNonNull(netconfOps);
165         this.rollbackSupport = rollbackSupport;
166     }
167
168     public static @NonNull AbstractNetconfDataTreeService of(final RemoteDeviceId id,
169             final MountPointContext mountContext, final Rpcs rpcs,
170             final NetconfSessionPreferences sessionPreferences) {
171         final var netconfOps = new NetconfBaseOps(rpcs, mountContext);
172         final boolean rollbackSupport = sessionPreferences.isRollbackSupported();
173
174         // Examine preferences and decide which implementation to use
175         if (sessionPreferences.isCandidateSupported()) {
176             return sessionPreferences.isRunningWritable()
177                 ? new CandidateWithRunning(id, netconfOps, rollbackSupport)
178                     : new Candidate(id, netconfOps, rollbackSupport);
179         } else if (sessionPreferences.isRunningWritable()) {
180             return new Running(id, netconfOps, rollbackSupport);
181         } else {
182             throw new IllegalArgumentException("Device " + id.getName() + " has advertised neither :writable-running "
183                 + "nor :candidate capability. Failed to establish session, as at least one of these must be "
184                 + "advertised.");
185         }
186     }
187
188     @Override
189     public synchronized ListenableFuture<DOMRpcResult> lock() {
190         if (!isLockAllowed) {
191             LOG.trace("Lock is not allowed by device configuration, ignoring lock results: {}", id);
192             return RPC_SUCCESS;
193         }
194
195         final ListenableFuture<DOMRpcResult> result = mergeFutures(lockImpl());
196         Futures.addCallback(result, new FutureCallback<>() {
197             @Override
198             public void onSuccess(final DOMRpcResult result) {
199                 final var errors = result.errors();
200                 if (errors.isEmpty()) {
201                     LOG.debug("{}: Lock successful.", id);
202                     return;
203                 }
204                 if (allWarnings(errors)) {
205                     LOG.info("{}: Lock successful with warnings {}", errors, id);
206                     return;
207                 }
208
209                 LOG.warn("{}: Lock failed with errors {}", id, errors);
210             }
211
212             @Override
213             public void onFailure(final Throwable throwable) {
214                 LOG.warn("{}: Lock failed.", id, throwable);
215             }
216         }, MoreExecutors.directExecutor());
217
218         return result;
219     }
220
221     List<ListenableFuture<? extends DOMRpcResult>> lockImpl() {
222         return List.of(lockSingle());
223     }
224
225     abstract ListenableFuture<? extends DOMRpcResult> lockSingle();
226
227     @Override
228     public synchronized ListenableFuture<DOMRpcResult> unlock() {
229         // FIXME: deal with lock with lifecycle?
230         if (!isLockAllowed) {
231             LOG.trace("Unlock is not allowed: {}", id);
232             return RPC_SUCCESS;
233         }
234
235         final ListenableFuture<DOMRpcResult> result = mergeFutures(unlockImpl());
236         Futures.addCallback(result, new FutureCallback<>() {
237             @Override
238             public void onSuccess(final DOMRpcResult result) {
239                 final var errors = result.getErrors();
240                 if (errors.isEmpty()) {
241                     LOG.debug("{}: Unlock successful.", id);
242                     return;
243                 }
244                 if (allWarnings(errors)) {
245                     LOG.info("{}: Unlock successful with warnings {}", errors, id);
246                     return;
247                 }
248
249                 LOG.error("{}: Unlock failed with errors {}", id, errors);
250             }
251
252             @Override
253             public void onFailure(final Throwable throwable) {
254                 LOG.error("{}: Unlock failed.", id, throwable);
255             }
256         }, MoreExecutors.directExecutor());
257         return result;
258     }
259
260     abstract List<ListenableFuture<? extends DOMRpcResult>> unlockImpl();
261
262     @Override
263     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
264         return netconfOps.getData(new NetconfRpcFutureCallback("Data read", id), Optional.ofNullable(path));
265     }
266
267     @Override
268     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
269             final List<YangInstanceIdentifier> fields) {
270         return netconfOps.getData(new NetconfRpcFutureCallback("Data read", id), Optional.ofNullable(path), fields);
271     }
272
273     @Override
274     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
275         return netconfOps.getConfigRunningData(new NetconfRpcFutureCallback("Data read", id),
276             Optional.ofNullable(path));
277     }
278
279     @Override
280     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
281             final List<YangInstanceIdentifier> fields) {
282         return netconfOps.getConfigRunningData(new NetconfRpcFutureCallback("Data read", id),
283             Optional.ofNullable(path), fields);
284     }
285
286     @Override
287     public synchronized ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
288             final YangInstanceIdentifier path, final NormalizedNode data,
289             final Optional<ModifyAction> defaultOperation) {
290         checkEditable(store);
291         return editConfig(
292             netconfOps.createEditConfigStructure(Optional.ofNullable(data), Optional.of(ModifyAction.MERGE), path),
293             defaultOperation.orElse(null));
294     }
295
296     @Override
297     public synchronized ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
298             final YangInstanceIdentifier path, final NormalizedNode data,
299             final Optional<ModifyAction> defaultOperation) {
300         checkEditable(store);
301         return editConfig(
302             netconfOps.createEditConfigStructure(Optional.ofNullable(data), Optional.of(ModifyAction.REPLACE), path),
303             defaultOperation.orElse(null));
304     }
305
306     @Override
307     public synchronized ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
308             final YangInstanceIdentifier path, final NormalizedNode data,
309             final Optional<ModifyAction> defaultOperation) {
310         checkEditable(store);
311         return editConfig(
312             netconfOps.createEditConfigStructure(Optional.ofNullable(data), Optional.of(ModifyAction.CREATE), path),
313             defaultOperation.orElse(null));
314     }
315
316     @Override
317     public synchronized ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
318             final YangInstanceIdentifier path) {
319         return editConfig(netconfOps.createEditConfigStructure(Optional.empty(),
320                 Optional.of(ModifyAction.DELETE), path), null);
321     }
322
323     @Override
324     public synchronized ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
325             final YangInstanceIdentifier path) {
326         return editConfig(netconfOps.createEditConfigStructure(Optional.empty(),
327                 Optional.of(ModifyAction.REMOVE), path), null);
328     }
329
330     @Override
331     public synchronized ListenableFuture<? extends DOMRpcResult> commit() {
332         return netconfOps.commit(new NetconfRpcFutureCallback("Commit", id));
333     }
334
335     @Override
336     public final Object getDeviceId() {
337         return id;
338     }
339
340     final void setLockAllowed(final boolean isLockAllowedOrig) {
341         isLockAllowed = isLockAllowedOrig;
342     }
343
344     abstract ListenableFuture<? extends DOMRpcResult> editConfig(DataContainerChild editStructure,
345         @Nullable ModifyAction defaultOperation);
346
347     private static void checkEditable(final LogicalDatastoreType store) {
348         checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can only edit configuration data, not %s", store);
349     }
350
351     // Transform list of futures related to RPC operation into a single Future
352     private static ListenableFuture<DOMRpcResult> mergeFutures(
353             final List<ListenableFuture<? extends DOMRpcResult>> futures) {
354         return Futures.whenAllComplete(futures).call(() -> {
355             if (futures.size() == 1) {
356                 // Fast path
357                 return Futures.getDone(futures.get(0));
358             }
359
360             final var builder = ImmutableList.<RpcError>builder();
361             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
362                 builder.addAll(Futures.getDone(future).getErrors());
363             }
364             return new DefaultDOMRpcResult(null, builder.build());
365         }, MoreExecutors.directExecutor());
366     }
367
368     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
369         return errors.stream().allMatch(error -> error.getSeverity() == ErrorSeverity.WARNING);
370     }
371 }