RestconfTransaction always operates on a single datastore
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / transactions / NetconfRestconfTransaction.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
12
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.StringJoiner;
27 import java.util.function.Supplier;
28 import java.util.stream.Collectors;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
32 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
33 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
34 import org.opendaylight.netconf.api.DocumentedException;
35 import org.opendaylight.netconf.api.NetconfDocumentedException;
36 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
37 import org.opendaylight.yangtools.yang.common.RpcError;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 final class NetconfRestconfTransaction extends RestconfTransaction {
49
50     private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
51
52     private final NetconfDataTreeService netconfService;
53     private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
54         Collections.synchronizedList(new ArrayList<>());
55     private volatile boolean isLocked = false;
56
57     NetconfRestconfTransaction(final NetconfDataTreeService netconfService) {
58         this.netconfService = requireNonNull(netconfService);
59         final ListenableFuture<? extends DOMRpcResult> lockResult = netconfService.lock();
60         Futures.addCallback(lockResult, lockOperationCallback, MoreExecutors.directExecutor());
61         resultsFutures.add(lockResult);
62     }
63
64     @Override
65     public void cancel() {
66         resultsFutures.clear();
67         executeWithLogging(netconfService::discardChanges);
68         executeWithLogging(netconfService::unlock);
69     }
70
71     @Override
72     public void delete(final YangInstanceIdentifier path) {
73         enqueueOperation(() -> netconfService.delete(CONFIGURATION, path));
74     }
75
76     @Override
77     public void remove(final YangInstanceIdentifier path) {
78         enqueueOperation(() -> netconfService.remove(CONFIGURATION, path));
79     }
80
81     @Override
82     public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
83         enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty()));
84     }
85
86     @Override
87     public void create(final YangInstanceIdentifier path, final NormalizedNode data,
88             final SchemaContext schemaContext) {
89         if (data instanceof MapNode || data instanceof LeafSetNode) {
90             final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
91             merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree);
92
93             for (final NormalizedNode child : ((NormalizedNodeContainer<?>) data).body()) {
94                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
95                 enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty()));
96             }
97         } else {
98             enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty()));
99         }
100     }
101
102     @Override
103     public void replace(final YangInstanceIdentifier path, final NormalizedNode data,
104             final SchemaContext schemaContext) {
105         if (data instanceof MapNode || data instanceof LeafSetNode) {
106             final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
107             merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree);
108
109             for (final NormalizedNode child : ((NormalizedNodeContainer<?>) data).body()) {
110                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
111                 enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty()));
112             }
113         } else {
114             enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty()));
115         }
116     }
117
118     @Override
119     public FluentFuture<? extends @NonNull CommitInfo> commit() {
120         final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
121
122         // First complete all resultsFutures and merge them ...
123         final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
124
125         // ... then evaluate if there are any problems
126         Futures.addCallback(resultErrors, new FutureCallback<>() {
127             @Override
128             public void onSuccess(final DOMRpcResult result) {
129                 final Collection<? extends RpcError> errors = result.getErrors();
130                 if (!allWarnings(errors)) {
131                     Futures.whenAllComplete(discardAndUnlock()).run(
132                         () -> commitResult.setException(toCommitFailedException(errors)),
133                         MoreExecutors.directExecutor());
134                     return;
135                 }
136
137                 // ... no problems so far, initiate commit
138                 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
139                     @Override
140                     public void onSuccess(final DOMRpcResult rpcResult) {
141                         final Collection<? extends RpcError> errors = result.getErrors();
142                         if (errors.isEmpty()) {
143                             Futures.whenAllComplete(netconfService.unlock()).run(
144                                 () -> commitResult.set(CommitInfo.empty()),
145                                 MoreExecutors.directExecutor());
146                         } else if (allWarnings(errors)) {
147                             LOG.info("Commit successful with warnings {}", errors);
148                             Futures.whenAllComplete(netconfService.unlock()).run(
149                                 () -> commitResult.set(CommitInfo.empty()),
150                                 MoreExecutors.directExecutor());
151                         } else {
152                             Futures.whenAllComplete(discardAndUnlock()).run(
153                                 () -> commitResult.setException(toCommitFailedException(errors)),
154                                 MoreExecutors.directExecutor());
155                         }
156                     }
157
158                     @Override
159                     public void onFailure(final Throwable throwable) {
160                         Futures.whenAllComplete(discardAndUnlock()).run(
161                             () -> commitResult.setException(throwable),
162                             MoreExecutors.directExecutor());
163                     }
164                 }, MoreExecutors.directExecutor());
165             }
166
167             @Override
168             public void onFailure(final Throwable throwable) {
169                 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
170                     MoreExecutors.directExecutor());
171             }
172         }, MoreExecutors.directExecutor());
173
174         return FluentFuture.from(commitResult);
175     }
176
177     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
178         justification = "https://github.com/spotbugs/spotbugs/issues/811")
179     private List<ListenableFuture<?>> discardAndUnlock() {
180         // execute discard & unlock operations only if lock operation was completed successfully
181         if (isLocked) {
182             return List.of(netconfService.discardChanges(), netconfService.unlock());
183         } else {
184             return Collections.emptyList();
185         }
186     }
187
188     private final FutureCallback<DOMRpcResult> lockOperationCallback = new FutureCallback<>() {
189         @Override
190         public void onSuccess(final DOMRpcResult rpcResult) {
191             if (rpcResult != null && allWarnings(rpcResult.getErrors())) {
192                 isLocked = true;
193             }
194         }
195
196         @Override
197         public void onFailure(final Throwable throwable) {
198             // do nothing
199         }
200     };
201
202     private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
203         final ListenableFuture<? extends DOMRpcResult> operationFuture;
204         synchronized (resultsFutures) {
205             // if we only have result for the lock operation ...
206             if (resultsFutures.size() == 1) {
207                 operationFuture = Futures.transformAsync(resultsFutures.get(0),
208                     result -> {
209                         // ... then add new operation to the chain if lock was successful
210                         if (result != null && (result.getErrors().isEmpty() || allWarnings(result.getErrors()))) {
211                             return operation.get();
212                         } else {
213                             return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
214                                 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.LOCK_DENIED,
215                                 DocumentedException.ErrorSeverity.ERROR));
216                         }
217                     },
218                     MoreExecutors.directExecutor());
219             } else {
220                 // ... otherwise just add operation to the execution chain
221                 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
222                     future -> operation.get(),
223                     MoreExecutors.directExecutor());
224             }
225             // ... finally save operation related future to the list
226             resultsFutures.add(operationFuture);
227         }
228     }
229
230     // Transform list of futures related to RPC operation into a single Future
231     private static ListenableFuture<DOMRpcResult> mergeFutures(
232         final List<ListenableFuture<? extends DOMRpcResult>> futures) {
233         return Futures.whenAllComplete(futures).call(() -> {
234             if (futures.size() == 1) {
235                 // Fast path
236                 return Futures.getDone(futures.get(0));
237             }
238
239             final var builder = ImmutableList.<RpcError>builder();
240             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
241                 builder.addAll(Futures.getDone(future).getErrors());
242             }
243             return new DefaultDOMRpcResult(null, builder.build());
244         }, MoreExecutors.directExecutor());
245     }
246
247     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
248             justification = "https://github.com/spotbugs/spotbugs/issues/811")
249     private static TransactionCommitFailedException toCommitFailedException(
250             final Collection<? extends RpcError> errors) {
251         DocumentedException.ErrorType errType = DocumentedException.ErrorType.APPLICATION;
252         DocumentedException.ErrorSeverity errSeverity = DocumentedException.ErrorSeverity.ERROR;
253         StringJoiner msgBuilder = new StringJoiner(" ");
254         String errorTag = "operation-failed";
255         for (final RpcError error : errors) {
256             switch (error.getErrorType()) {
257                 case RPC:
258                     errType = DocumentedException.ErrorType.RPC;
259                     break;
260                 case PROTOCOL:
261                     errType = DocumentedException.ErrorType.PROTOCOL;
262                     break;
263                 case TRANSPORT:
264                     errType = DocumentedException.ErrorType.TRANSPORT;
265                     break;
266                 case APPLICATION:
267                 default:
268                     errType = DocumentedException.ErrorType.APPLICATION;
269                     break;
270             }
271             switch (error.getSeverity()) {
272                 case WARNING:
273                     errSeverity = DocumentedException.ErrorSeverity.WARNING;
274                     break;
275                 case ERROR:
276                 default:
277                     errSeverity = DocumentedException.ErrorSeverity.ERROR;
278                     break;
279             }
280             msgBuilder.add(error.getMessage());
281             msgBuilder.add(error.getInfo());
282             errorTag = error.getTag();
283         }
284
285         return new TransactionCommitFailedException("Netconf transaction commit failed",
286             new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType,
287                 DocumentedException.ErrorTag.from(errorTag), errSeverity));
288     }
289
290     private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
291         final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
292         Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
293             @Override
294             public void onSuccess(final DOMRpcResult rpcResult) {
295                 if (rpcResult != null && !rpcResult.getErrors().isEmpty()) {
296                     LOG.error("Errors occurred during processing of the RPC operation: {}",
297                         rpcResult.getErrors().stream().map(Object::toString).collect(Collectors.joining(",")));
298                 }
299             }
300
301             @Override
302             public void onFailure(final Throwable throwable) {
303                 LOG.error("Error processing operation", throwable);
304             }
305         }, MoreExecutors.directExecutor());
306     }
307
308     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
309         justification = "https://github.com/spotbugs/spotbugs/issues/811")
310     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
311         return errors.stream().allMatch(error -> error.getSeverity() == RpcError.ErrorSeverity.WARNING);
312     }
313 }