Bump upstreams to SNAPSHOTs
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / sal / AbstractNetconfDataTreeService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.connect.netconf.sal;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Optional;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
25 import org.opendaylight.mdsal.dom.api.DOMRpcService;
26 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
27 import org.opendaylight.netconf.api.ModifyAction;
28 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
29 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
30 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
31 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback;
32 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
33 import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext;
34 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
35 import org.opendaylight.yangtools.yang.common.RpcError;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public abstract class AbstractNetconfDataTreeService implements NetconfDataTreeService {
43     private static final class Candidate extends AbstractNetconfDataTreeService {
44         Candidate(final RemoteDeviceId id, final NetconfBaseOps netconfOps, final boolean rollbackSupport) {
45             super(id, netconfOps, rollbackSupport);
46         }
47
48         /**
49          * This has to be non blocking since it is called from a callback on commit and it is netty threadpool that is
50          * really sensitive to blocking calls.
51          */
52         @Override
53         public ListenableFuture<? extends DOMRpcResult> discardChanges() {
54             return netconfOps.discardChanges(new NetconfRpcFutureCallback("Discard candidate", id));
55         }
56
57         @Override
58         ListenableFuture<? extends DOMRpcResult> lockSingle() {
59             return netconfOps.lockCandidate(new NetconfRpcFutureCallback("Lock candidate", id));
60         }
61
62         @Override
63         List<ListenableFuture<? extends DOMRpcResult>> unlockImpl() {
64             return List.of(netconfOps.unlockCandidate(new NetconfRpcFutureCallback("Unlock candidate", id)));
65         }
66
67         @Override
68         ListenableFuture<? extends DOMRpcResult> editConfig(final DataContainerChild editStructure,
69                 final ModifyAction defaultOperation) {
70             final NetconfRpcFutureCallback callback = new NetconfRpcFutureCallback("Edit candidate", id);
71             return defaultOperation == null ? netconfOps.editConfigCandidate(callback, editStructure, rollbackSupport)
72                 : netconfOps.editConfigCandidate(callback, editStructure, defaultOperation, rollbackSupport);
73         }
74     }
75
76     private static final class Running extends AbstractNetconfDataTreeService {
77         Running(final RemoteDeviceId id, final NetconfBaseOps netconfOps, final boolean rollbackSupport) {
78             super(id, netconfOps, rollbackSupport);
79         }
80
81         @Override
82         public ListenableFuture<DOMRpcResult> discardChanges() {
83             // Changes cannot be discarded from running
84             return RPC_SUCCESS;
85         }
86
87         @Override
88         ListenableFuture<? extends DOMRpcResult> lockSingle() {
89             return netconfOps.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
90         }
91
92         @Override
93         List<ListenableFuture<? extends DOMRpcResult>> unlockImpl() {
94             return List.of(netconfOps.unlockRunning(new NetconfRpcFutureCallback("Unlock running", id)));
95         }
96
97         @Override
98         ListenableFuture<? extends DOMRpcResult> editConfig(final DataContainerChild editStructure,
99                 final ModifyAction defaultOperation) {
100             final NetconfRpcFutureCallback callback = new NetconfRpcFutureCallback("Edit running", id);
101             return defaultOperation == null ? netconfOps.editConfigRunning(callback, editStructure, rollbackSupport)
102                 : netconfOps.editConfigRunning(callback, editStructure, defaultOperation, rollbackSupport);
103         }
104     }
105
106     private static final class CandidateWithRunning extends AbstractNetconfDataTreeService {
107         private final Candidate candidate;
108         private final Running running;
109
110         CandidateWithRunning(final RemoteDeviceId id, final NetconfBaseOps netconfOps,
111                 final boolean rollbackSupport) {
112             super(id, netconfOps, rollbackSupport);
113             candidate = new Candidate(id, netconfOps, rollbackSupport);
114             running = new Running(id, netconfOps, rollbackSupport);
115         }
116
117         @Override
118         public ListenableFuture<? extends DOMRpcResult> discardChanges() {
119             return candidate.discardChanges();
120         }
121
122         @Override
123         ListenableFuture<DOMRpcResult> lockSingle() {
124             throw new UnsupportedOperationException();
125         }
126
127         @Override
128         List<ListenableFuture<? extends DOMRpcResult>> lockImpl() {
129             return List.of(candidate.lockSingle(), running.lockSingle());
130         }
131
132         @Override
133         List<ListenableFuture<? extends DOMRpcResult>> unlockImpl() {
134             return List.of(running.unlock(), candidate.unlock());
135         }
136
137         @Override
138         ListenableFuture<? extends DOMRpcResult> editConfig(final DataContainerChild editStructure,
139                 final ModifyAction defaultOperation) {
140             return candidate.editConfig(editStructure, defaultOperation);
141         }
142     }
143
144     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfDataTreeService.class);
145     private static final ListenableFuture<DOMRpcResult> RPC_SUCCESS =
146         Futures.immediateFuture(new DefaultDOMRpcResult());
147
148     final @NonNull RemoteDeviceId id;
149     final NetconfBaseOps netconfOps;
150     final boolean rollbackSupport;
151
152     // FIXME: what do we do with locks acquired before this got flipped?
153     private volatile boolean isLockAllowed = true;
154
155     AbstractNetconfDataTreeService(final RemoteDeviceId id, final NetconfBaseOps netconfOps,
156             final boolean rollbackSupport) {
157         this.id = requireNonNull(id);
158         this.netconfOps = requireNonNull(netconfOps);
159         this.rollbackSupport = rollbackSupport;
160     }
161
162     public static @NonNull AbstractNetconfDataTreeService of(final RemoteDeviceId id,
163             final MountPointContext mountContext, final DOMRpcService rpc,
164             final NetconfSessionPreferences netconfSessionPreferences) {
165         final NetconfBaseOps netconfOps = new NetconfBaseOps(rpc, mountContext);
166         final boolean rollbackSupport = netconfSessionPreferences.isRollbackSupported();
167
168         // Examine preferences and decide which implementation to use
169         if (netconfSessionPreferences.isCandidateSupported()) {
170             return netconfSessionPreferences.isRunningWritable()
171                 ? new CandidateWithRunning(id, netconfOps, rollbackSupport)
172                     : new Candidate(id, netconfOps, rollbackSupport);
173         } else if (netconfSessionPreferences.isRunningWritable()) {
174             return new Running(id, netconfOps, rollbackSupport);
175         } else {
176             throw new IllegalArgumentException("Device " + id.getName() + " has advertised neither :writable-running "
177                 + "nor :candidate capability. Failed to establish session, as at least one of these must be "
178                 + "advertised.");
179         }
180     }
181
182     @Override
183     public synchronized ListenableFuture<DOMRpcResult> lock() {
184         if (!isLockAllowed) {
185             LOG.trace("Lock is not allowed by device configuration, ignoring lock results: {}", id);
186             return RPC_SUCCESS;
187         }
188
189         final ListenableFuture<DOMRpcResult> result = mergeFutures(lockImpl());
190         Futures.addCallback(result, new FutureCallback<DOMRpcResult>() {
191             @Override
192             public void onSuccess(final DOMRpcResult result) {
193                 final var errors = result.getErrors();
194                 if (errors.isEmpty()) {
195                     LOG.debug("{}: Lock successful.", id);
196                     return;
197                 }
198                 if (allWarnings(errors)) {
199                     LOG.info("{}: Lock successful with warnings {}", errors, id);
200                     return;
201                 }
202
203                 LOG.warn("{}: Lock failed with errors {}", id, errors);
204             }
205
206             @Override
207             public void onFailure(final Throwable throwable) {
208                 LOG.warn("{}: Lock failed.", id, throwable);
209             }
210         }, MoreExecutors.directExecutor());
211
212         return result;
213     }
214
215     List<ListenableFuture<? extends DOMRpcResult>> lockImpl() {
216         return List.of(lockSingle());
217     }
218
219     abstract ListenableFuture<? extends DOMRpcResult> lockSingle();
220
221     @Override
222     public synchronized ListenableFuture<DOMRpcResult> unlock() {
223         // FIXME: deal with lock with lifecycle?
224         if (!isLockAllowed) {
225             LOG.trace("Unlock is not allowed: {}", id);
226             return RPC_SUCCESS;
227         }
228
229         final ListenableFuture<DOMRpcResult> result = mergeFutures(unlockImpl());
230         Futures.addCallback(result, new FutureCallback<DOMRpcResult>() {
231             @Override
232             public void onSuccess(final DOMRpcResult result) {
233                 final var errors = result.getErrors();
234                 if (errors.isEmpty()) {
235                     LOG.debug("{}: Unlock successful.", id);
236                     return;
237                 }
238                 if (allWarnings(errors)) {
239                     LOG.info("{}: Unlock successful with warnings {}", errors, id);
240                     return;
241                 }
242
243                 LOG.error("{}: Unlock failed with errors {}", id, errors);
244             }
245
246             @Override
247             public void onFailure(final Throwable throwable) {
248                 LOG.error("{}: Unlock failed.", id, throwable);
249             }
250         }, MoreExecutors.directExecutor());
251         return result;
252     }
253
254     abstract List<ListenableFuture<? extends DOMRpcResult>> unlockImpl();
255
256     @Override
257     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
258         return netconfOps.getData(new NetconfRpcFutureCallback("Data read", id), Optional.ofNullable(path));
259     }
260
261     @Override
262     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
263             final List<YangInstanceIdentifier> fields) {
264         return netconfOps.getData(new NetconfRpcFutureCallback("Data read", id), Optional.ofNullable(path), fields);
265     }
266
267     @Override
268     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
269         return netconfOps.getConfigRunningData(new NetconfRpcFutureCallback("Data read", id),
270             Optional.ofNullable(path));
271     }
272
273     @Override
274     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
275             final List<YangInstanceIdentifier> fields) {
276         return netconfOps.getConfigRunningData(new NetconfRpcFutureCallback("Data read", id),
277             Optional.ofNullable(path), fields);
278     }
279
280     @Override
281     public synchronized ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
282             final YangInstanceIdentifier path, final NormalizedNode data,
283             final Optional<ModifyAction> defaultOperation) {
284         checkEditable(store);
285         return editConfig(
286             netconfOps.createEditConfigStructure(Optional.ofNullable(data), Optional.of(ModifyAction.MERGE), path),
287             defaultOperation.orElse(null));
288     }
289
290     @Override
291     public synchronized ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
292             final YangInstanceIdentifier path, final NormalizedNode data,
293             final Optional<ModifyAction> defaultOperation) {
294         checkEditable(store);
295         return editConfig(
296             netconfOps.createEditConfigStructure(Optional.ofNullable(data), Optional.of(ModifyAction.REPLACE), path),
297             defaultOperation.orElse(null));
298     }
299
300     @Override
301     public synchronized ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
302             final YangInstanceIdentifier path, final NormalizedNode data,
303             final Optional<ModifyAction> defaultOperation) {
304         checkEditable(store);
305         return editConfig(
306             netconfOps.createEditConfigStructure(Optional.ofNullable(data), Optional.of(ModifyAction.CREATE), path),
307             defaultOperation.orElse(null));
308     }
309
310     @Override
311     public synchronized ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
312             final YangInstanceIdentifier path) {
313         return editConfig(netconfOps.createEditConfigStructure(Optional.empty(),
314                 Optional.of(ModifyAction.DELETE), path), null);
315     }
316
317     @Override
318     public synchronized ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
319             final YangInstanceIdentifier path) {
320         return editConfig(netconfOps.createEditConfigStructure(Optional.empty(),
321                 Optional.of(ModifyAction.REMOVE), path), null);
322     }
323
324     @Override
325     public synchronized ListenableFuture<? extends DOMRpcResult> commit() {
326         return netconfOps.commit(new NetconfRpcFutureCallback("Commit", id));
327     }
328
329     @Override
330     public final Object getDeviceId() {
331         return id;
332     }
333
334     final void setLockAllowed(final boolean isLockAllowedOrig) {
335         isLockAllowed = isLockAllowedOrig;
336     }
337
338     abstract ListenableFuture<? extends DOMRpcResult> editConfig(DataContainerChild editStructure,
339         @Nullable ModifyAction defaultOperation);
340
341     private static void checkEditable(final LogicalDatastoreType store) {
342         checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can only edit configuration data, not %s", store);
343     }
344
345     // Transform list of futures related to RPC operation into a single Future
346     private static ListenableFuture<DOMRpcResult> mergeFutures(
347             final List<ListenableFuture<? extends DOMRpcResult>> futures) {
348         return Futures.whenAllComplete(futures).call(() -> {
349             if (futures.size() == 1) {
350                 // Fast path
351                 return Futures.getDone(futures.get(0));
352             }
353
354             final var builder = ImmutableList.<RpcError>builder();
355             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
356                 builder.addAll(Futures.getDone(future).getErrors());
357             }
358             return new DefaultDOMRpcResult(null, builder.build());
359         }, MoreExecutors.directExecutor());
360     }
361
362     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
363         return errors.stream().allMatch(error -> error.getSeverity() == ErrorSeverity.WARNING);
364     }
365 }