14c19b1c26c4a66a0a1c21f5f17c77c2db1564f4
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / transactions / NetconfRestconfTransaction.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.StringJoiner;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
32 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
33 import org.opendaylight.netconf.api.DocumentedException;
34 import org.opendaylight.netconf.api.NetconfDocumentedException;
35 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
36 import org.opendaylight.yangtools.yang.common.RpcError;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
39 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 final class NetconfRestconfTransaction extends RestconfTransaction {
48
49     private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
50
51     private final NetconfDataTreeService netconfService;
52     private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
53         Collections.synchronizedList(new ArrayList<>());
54     private volatile boolean isLocked = false;
55
56     NetconfRestconfTransaction(final NetconfDataTreeService netconfService) {
57         this.netconfService = requireNonNull(netconfService);
58         final ListenableFuture<? extends DOMRpcResult> lockResult = netconfService.lock();
59         Futures.addCallback(lockResult, lockOperationCallback, MoreExecutors.directExecutor());
60         resultsFutures.add(lockResult);
61     }
62
63     @Override
64     public void cancel() {
65         resultsFutures.clear();
66         executeWithLogging(netconfService::discardChanges);
67         executeWithLogging(netconfService::unlock);
68     }
69
70     @Override
71     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
72         enqueueOperation(() -> netconfService.delete(store, path));
73     }
74
75     @Override
76     public void remove(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
77         enqueueOperation(() -> netconfService.remove(store, path));
78     }
79
80     @Override
81     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
82             final NormalizedNode<?, ?> data) {
83         enqueueOperation(() -> netconfService.merge(store, path, data, Optional.empty()));
84     }
85
86     @Override
87     public void create(final LogicalDatastoreType store, final YangInstanceIdentifier path,
88            final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
89         if (data instanceof MapNode || data instanceof LeafSetNode) {
90             final NormalizedNode<?, ?> emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
91             merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()),
92                 emptySubTree);
93
94             for (final NormalizedNode<?, ?> child : ((NormalizedNodeContainer<?, ?, ?>) data).getValue()) {
95                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
96                 enqueueOperation(() -> netconfService.create(store, childPath, child, Optional.empty()));
97             }
98         } else {
99             enqueueOperation(() -> netconfService.create(store, path, data, Optional.empty()));
100         }
101     }
102
103     @Override
104     public void replace(final LogicalDatastoreType store, final YangInstanceIdentifier path,
105             final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
106         if (data instanceof MapNode || data instanceof LeafSetNode) {
107             final NormalizedNode<?, ?> emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
108             merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()),
109                 emptySubTree);
110
111             for (final NormalizedNode<?, ?> child : ((NormalizedNodeContainer<?, ?, ?>) data).getValue()) {
112                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
113                 enqueueOperation(() -> netconfService.replace(store, childPath, child, Optional.empty()));
114             }
115         } else {
116             enqueueOperation(() -> netconfService.replace(store, path, data, Optional.empty()));
117         }
118     }
119
120     @Override
121     public FluentFuture<? extends @NonNull CommitInfo> commit() {
122         final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
123
124         // First complete all resultsFutures and merge them ...
125         final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
126
127         // ... then evaluate if there are any problems
128         Futures.addCallback(resultErrors, new FutureCallback<>() {
129             @Override
130             public void onSuccess(final DOMRpcResult result) {
131                 final Collection<? extends RpcError> errors = result.getErrors();
132                 if (!allWarnings(errors)) {
133                     Futures.whenAllComplete(discardAndUnlock()).run(
134                         () -> commitResult.setException(mapRpcErrorsToNetconfDocException(errors)),
135                         MoreExecutors.directExecutor());
136                     return;
137                 }
138
139                 // ... no problems so far, initiate commit
140                 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
141                     @Override
142                     public void onSuccess(final DOMRpcResult rpcResult) {
143                         final Collection<? extends RpcError> errors = result.getErrors();
144                         if (errors.isEmpty()) {
145                             Futures.whenAllComplete(netconfService.unlock()).run(
146                                 () -> commitResult.set(CommitInfo.empty()),
147                                 MoreExecutors.directExecutor());
148                         } else if (allWarnings(errors)) {
149                             LOG.info("Commit successful with warnings {}", errors);
150                             Futures.whenAllComplete(netconfService.unlock()).run(
151                                 () -> commitResult.set(CommitInfo.empty()),
152                                 MoreExecutors.directExecutor());
153                         } else {
154                             Futures.whenAllComplete(discardAndUnlock()).run(
155                                 () -> commitResult.setException(mapRpcErrorsToNetconfDocException(errors)),
156                                 MoreExecutors.directExecutor());
157                         }
158                     }
159
160                     @Override
161                     public void onFailure(final Throwable throwable) {
162                         Futures.whenAllComplete(discardAndUnlock()).run(
163                             () -> commitResult.setException(throwable),
164                             MoreExecutors.directExecutor());
165                     }
166                 }, MoreExecutors.directExecutor());
167             }
168
169             @Override
170             public void onFailure(final Throwable throwable) {
171                 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
172                     MoreExecutors.directExecutor());
173             }
174         }, MoreExecutors.directExecutor());
175
176         return FluentFuture.from(commitResult);
177     }
178
179     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
180         justification = "https://github.com/spotbugs/spotbugs/issues/811")
181     private List<ListenableFuture<?>> discardAndUnlock() {
182         // execute discard & unlock operations only if lock operation was completed successfully
183         if (isLocked) {
184             return List.of(netconfService.discardChanges(), netconfService.unlock());
185         } else {
186             return Collections.emptyList();
187         }
188     }
189
190     private final FutureCallback<DOMRpcResult> lockOperationCallback = new FutureCallback<>() {
191         @Override
192         public void onSuccess(final DOMRpcResult rpcResult) {
193             if (rpcResult != null && allWarnings(rpcResult.getErrors())) {
194                 isLocked = true;
195             }
196         }
197
198         @Override
199         public void onFailure(final Throwable throwable) {
200             // do nothing
201         }
202     };
203
204     private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
205         final ListenableFuture<? extends DOMRpcResult> operationFuture;
206         synchronized (resultsFutures) {
207             // if we only have result for the lock operation ...
208             if (resultsFutures.size() == 1) {
209                 operationFuture = Futures.transformAsync(resultsFutures.get(0),
210                     result -> {
211                         // ... then add new operation to the chain if lock was successful
212                         if (result != null && (result.getErrors().isEmpty() || allWarnings(result.getErrors()))) {
213                             return operation.get();
214                         } else {
215                             return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
216                                 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.LOCK_DENIED,
217                                 DocumentedException.ErrorSeverity.ERROR));
218                         }
219                     },
220                     MoreExecutors.directExecutor());
221             } else {
222                 // ... otherwise just add operation to the execution chain
223                 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
224                     future -> operation.get(),
225                     MoreExecutors.directExecutor());
226             }
227             // ... finally save operation related future to the list
228             resultsFutures.add(operationFuture);
229         }
230     }
231
232     // Transform list of futures related to RPC operation into a single Future
233     private static ListenableFuture<DOMRpcResult> mergeFutures(
234         final List<ListenableFuture<? extends DOMRpcResult>> futures) {
235         return Futures.whenAllComplete(futures).call(() -> {
236             if (futures.size() == 1) {
237                 // Fast path
238                 return Futures.getDone(futures.get(0));
239             }
240
241             final var builder = ImmutableList.<RpcError>builder();
242             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
243                 builder.addAll(Futures.getDone(future).getErrors());
244             }
245             return new DefaultDOMRpcResult(null, builder.build());
246         }, MoreExecutors.directExecutor());
247     }
248
249     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
250         justification = "https://github.com/spotbugs/spotbugs/issues/811")
251     private static NetconfDocumentedException mapRpcErrorsToNetconfDocException(
252         final Collection<? extends RpcError> errors) {
253         DocumentedException.ErrorType errType = DocumentedException.ErrorType.APPLICATION;
254         DocumentedException.ErrorSeverity errSeverity = DocumentedException.ErrorSeverity.ERROR;
255         StringJoiner msgBuilder = new StringJoiner(" ");
256         String errorTag = "operation-failed";
257         for (final RpcError error : errors) {
258             final RpcError.ErrorType errorType = error.getErrorType();
259             switch (errorType) {
260                 case RPC:
261                     errType = DocumentedException.ErrorType.RPC;
262                     break;
263                 case PROTOCOL:
264                     errType = DocumentedException.ErrorType.PROTOCOL;
265                     break;
266                 case TRANSPORT:
267                     errType = DocumentedException.ErrorType.TRANSPORT;
268                     break;
269                 case APPLICATION:
270                 default:
271                     errType = DocumentedException.ErrorType.APPLICATION;
272                     break;
273             }
274             final RpcError.ErrorSeverity severity = error.getSeverity();
275             switch (severity) {
276                 case WARNING:
277                     errSeverity = DocumentedException.ErrorSeverity.WARNING;
278                     break;
279                 case ERROR:
280                 default:
281                     errSeverity = DocumentedException.ErrorSeverity.ERROR;
282                     break;
283             }
284             msgBuilder.add(error.getMessage());
285             msgBuilder.add(error.getInfo());
286             errorTag = error.getTag();
287         }
288         return new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType,
289             DocumentedException.ErrorTag.from(errorTag), errSeverity);
290     }
291
292     private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
293         final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
294         Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
295             @Override
296             public void onSuccess(final DOMRpcResult rpcResult) {
297                 if (rpcResult != null && !rpcResult.getErrors().isEmpty()) {
298                     LOG.error("Errors occurred during processing of the RPC operation: {}",
299                         rpcResult.getErrors().stream().map(Object::toString).collect(Collectors.joining(",")));
300                 }
301             }
302
303             @Override
304             public void onFailure(final Throwable throwable) {
305                 LOG.error("Error processing operation", throwable);
306             }
307         }, MoreExecutors.directExecutor());
308     }
309
310     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
311         justification = "https://github.com/spotbugs/spotbugs/issues/811")
312     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
313         return errors.stream().allMatch(error -> error.getSeverity() == RpcError.ErrorSeverity.WARNING);
314     }
315 }