Handle NetconfDocumentedExceptions correctly
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / transactions / NetconfRestconfTransaction.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.StringJoiner;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
32 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
33 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
34 import org.opendaylight.netconf.api.DocumentedException;
35 import org.opendaylight.netconf.api.NetconfDocumentedException;
36 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
37 import org.opendaylight.yangtools.yang.common.RpcError;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 final class NetconfRestconfTransaction extends RestconfTransaction {
49
50     private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
51
52     private final NetconfDataTreeService netconfService;
53     private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
54         Collections.synchronizedList(new ArrayList<>());
55     private volatile boolean isLocked = false;
56
57     NetconfRestconfTransaction(final NetconfDataTreeService netconfService) {
58         this.netconfService = requireNonNull(netconfService);
59         final ListenableFuture<? extends DOMRpcResult> lockResult = netconfService.lock();
60         Futures.addCallback(lockResult, lockOperationCallback, MoreExecutors.directExecutor());
61         resultsFutures.add(lockResult);
62     }
63
64     @Override
65     public void cancel() {
66         resultsFutures.clear();
67         executeWithLogging(netconfService::discardChanges);
68         executeWithLogging(netconfService::unlock);
69     }
70
71     @Override
72     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
73         enqueueOperation(() -> netconfService.delete(store, path));
74     }
75
76     @Override
77     public void remove(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
78         enqueueOperation(() -> netconfService.remove(store, path));
79     }
80
81     @Override
82     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
83             final NormalizedNode<?, ?> data) {
84         enqueueOperation(() -> netconfService.merge(store, path, data, Optional.empty()));
85     }
86
87     @Override
88     public void create(final LogicalDatastoreType store, final YangInstanceIdentifier path,
89            final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
90         if (data instanceof MapNode || data instanceof LeafSetNode) {
91             final NormalizedNode<?, ?> emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
92             merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()),
93                 emptySubTree);
94
95             for (final NormalizedNode<?, ?> child : ((NormalizedNodeContainer<?, ?, ?>) data).getValue()) {
96                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
97                 enqueueOperation(() -> netconfService.create(store, childPath, child, Optional.empty()));
98             }
99         } else {
100             enqueueOperation(() -> netconfService.create(store, path, data, Optional.empty()));
101         }
102     }
103
104     @Override
105     public void replace(final LogicalDatastoreType store, final YangInstanceIdentifier path,
106             final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
107         if (data instanceof MapNode || data instanceof LeafSetNode) {
108             final NormalizedNode<?, ?> emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
109             merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()),
110                 emptySubTree);
111
112             for (final NormalizedNode<?, ?> child : ((NormalizedNodeContainer<?, ?, ?>) data).getValue()) {
113                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
114                 enqueueOperation(() -> netconfService.replace(store, childPath, child, Optional.empty()));
115             }
116         } else {
117             enqueueOperation(() -> netconfService.replace(store, path, data, Optional.empty()));
118         }
119     }
120
121     @Override
122     public FluentFuture<? extends @NonNull CommitInfo> commit() {
123         final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
124
125         // First complete all resultsFutures and merge them ...
126         final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
127
128         // ... then evaluate if there are any problems
129         Futures.addCallback(resultErrors, new FutureCallback<>() {
130             @Override
131             public void onSuccess(final DOMRpcResult result) {
132                 final Collection<? extends RpcError> errors = result.getErrors();
133                 if (!allWarnings(errors)) {
134                     Futures.whenAllComplete(discardAndUnlock()).run(
135                         () -> commitResult.setException(toCommitFailedException(errors)),
136                         MoreExecutors.directExecutor());
137                     return;
138                 }
139
140                 // ... no problems so far, initiate commit
141                 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
142                     @Override
143                     public void onSuccess(final DOMRpcResult rpcResult) {
144                         final Collection<? extends RpcError> errors = result.getErrors();
145                         if (errors.isEmpty()) {
146                             Futures.whenAllComplete(netconfService.unlock()).run(
147                                 () -> commitResult.set(CommitInfo.empty()),
148                                 MoreExecutors.directExecutor());
149                         } else if (allWarnings(errors)) {
150                             LOG.info("Commit successful with warnings {}", errors);
151                             Futures.whenAllComplete(netconfService.unlock()).run(
152                                 () -> commitResult.set(CommitInfo.empty()),
153                                 MoreExecutors.directExecutor());
154                         } else {
155                             Futures.whenAllComplete(discardAndUnlock()).run(
156                                 () -> commitResult.setException(toCommitFailedException(errors)),
157                                 MoreExecutors.directExecutor());
158                         }
159                     }
160
161                     @Override
162                     public void onFailure(final Throwable throwable) {
163                         Futures.whenAllComplete(discardAndUnlock()).run(
164                             () -> commitResult.setException(throwable),
165                             MoreExecutors.directExecutor());
166                     }
167                 }, MoreExecutors.directExecutor());
168             }
169
170             @Override
171             public void onFailure(final Throwable throwable) {
172                 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
173                     MoreExecutors.directExecutor());
174             }
175         }, MoreExecutors.directExecutor());
176
177         return FluentFuture.from(commitResult);
178     }
179
180     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
181         justification = "https://github.com/spotbugs/spotbugs/issues/811")
182     private List<ListenableFuture<?>> discardAndUnlock() {
183         // execute discard & unlock operations only if lock operation was completed successfully
184         if (isLocked) {
185             return List.of(netconfService.discardChanges(), netconfService.unlock());
186         } else {
187             return Collections.emptyList();
188         }
189     }
190
191     private final FutureCallback<DOMRpcResult> lockOperationCallback = new FutureCallback<>() {
192         @Override
193         public void onSuccess(final DOMRpcResult rpcResult) {
194             if (rpcResult != null && allWarnings(rpcResult.getErrors())) {
195                 isLocked = true;
196             }
197         }
198
199         @Override
200         public void onFailure(final Throwable throwable) {
201             // do nothing
202         }
203     };
204
205     private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
206         final ListenableFuture<? extends DOMRpcResult> operationFuture;
207         synchronized (resultsFutures) {
208             // if we only have result for the lock operation ...
209             if (resultsFutures.size() == 1) {
210                 operationFuture = Futures.transformAsync(resultsFutures.get(0),
211                     result -> {
212                         // ... then add new operation to the chain if lock was successful
213                         if (result != null && (result.getErrors().isEmpty() || allWarnings(result.getErrors()))) {
214                             return operation.get();
215                         } else {
216                             return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
217                                 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.LOCK_DENIED,
218                                 DocumentedException.ErrorSeverity.ERROR));
219                         }
220                     },
221                     MoreExecutors.directExecutor());
222             } else {
223                 // ... otherwise just add operation to the execution chain
224                 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
225                     future -> operation.get(),
226                     MoreExecutors.directExecutor());
227             }
228             // ... finally save operation related future to the list
229             resultsFutures.add(operationFuture);
230         }
231     }
232
233     // Transform list of futures related to RPC operation into a single Future
234     private static ListenableFuture<DOMRpcResult> mergeFutures(
235         final List<ListenableFuture<? extends DOMRpcResult>> futures) {
236         return Futures.whenAllComplete(futures).call(() -> {
237             if (futures.size() == 1) {
238                 // Fast path
239                 return Futures.getDone(futures.get(0));
240             }
241
242             final var builder = ImmutableList.<RpcError>builder();
243             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
244                 builder.addAll(Futures.getDone(future).getErrors());
245             }
246             return new DefaultDOMRpcResult(null, builder.build());
247         }, MoreExecutors.directExecutor());
248     }
249
250     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
251             justification = "https://github.com/spotbugs/spotbugs/issues/811")
252     private static TransactionCommitFailedException toCommitFailedException(
253             final Collection<? extends RpcError> errors) {
254         DocumentedException.ErrorType errType = DocumentedException.ErrorType.APPLICATION;
255         DocumentedException.ErrorSeverity errSeverity = DocumentedException.ErrorSeverity.ERROR;
256         StringJoiner msgBuilder = new StringJoiner(" ");
257         String errorTag = "operation-failed";
258         for (final RpcError error : errors) {
259             switch (error.getErrorType()) {
260                 case RPC:
261                     errType = DocumentedException.ErrorType.RPC;
262                     break;
263                 case PROTOCOL:
264                     errType = DocumentedException.ErrorType.PROTOCOL;
265                     break;
266                 case TRANSPORT:
267                     errType = DocumentedException.ErrorType.TRANSPORT;
268                     break;
269                 case APPLICATION:
270                 default:
271                     errType = DocumentedException.ErrorType.APPLICATION;
272                     break;
273             }
274             switch (error.getSeverity()) {
275                 case WARNING:
276                     errSeverity = DocumentedException.ErrorSeverity.WARNING;
277                     break;
278                 case ERROR:
279                 default:
280                     errSeverity = DocumentedException.ErrorSeverity.ERROR;
281                     break;
282             }
283             msgBuilder.add(error.getMessage());
284             msgBuilder.add(error.getInfo());
285             errorTag = error.getTag();
286         }
287
288         return new TransactionCommitFailedException("Netconf transaction commit failed",
289             new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType,
290                 DocumentedException.ErrorTag.from(errorTag), errSeverity));
291     }
292
293     private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
294         final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
295         Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
296             @Override
297             public void onSuccess(final DOMRpcResult rpcResult) {
298                 if (rpcResult != null && !rpcResult.getErrors().isEmpty()) {
299                     LOG.error("Errors occurred during processing of the RPC operation: {}",
300                         rpcResult.getErrors().stream().map(Object::toString).collect(Collectors.joining(",")));
301                 }
302             }
303
304             @Override
305             public void onFailure(final Throwable throwable) {
306                 LOG.error("Error processing operation", throwable);
307             }
308         }, MoreExecutors.directExecutor());
309     }
310
311     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
312         justification = "https://github.com/spotbugs/spotbugs/issues/811")
313     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
314         return errors.stream().allMatch(error -> error.getSeverity() == RpcError.ErrorSeverity.WARNING);
315     }
316 }