Bump upstreams to SNAPSHOTs
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / transactions / NetconfRestconfTransaction.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
12
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.StringJoiner;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
31 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
32 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
33 import org.opendaylight.netconf.api.NetconfDocumentedException;
34 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
35 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
36 import org.opendaylight.yangtools.yang.common.ErrorTag;
37 import org.opendaylight.yangtools.yang.common.ErrorType;
38 import org.opendaylight.yangtools.yang.common.RpcError;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
44 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
45 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 final class NetconfRestconfTransaction extends RestconfTransaction {
50
51     private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
52
53     private final NetconfDataTreeService netconfService;
54     private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
55         Collections.synchronizedList(new ArrayList<>());
56     private volatile boolean isLocked = false;
57
58     NetconfRestconfTransaction(final NetconfDataTreeService netconfService) {
59         this.netconfService = requireNonNull(netconfService);
60         final ListenableFuture<? extends DOMRpcResult> lockResult = netconfService.lock();
61         Futures.addCallback(lockResult, lockOperationCallback, MoreExecutors.directExecutor());
62         resultsFutures.add(lockResult);
63     }
64
65     @Override
66     public void cancel() {
67         resultsFutures.clear();
68         executeWithLogging(netconfService::discardChanges);
69         executeWithLogging(netconfService::unlock);
70     }
71
72     @Override
73     public void delete(final YangInstanceIdentifier path) {
74         enqueueOperation(() -> netconfService.delete(CONFIGURATION, path));
75     }
76
77     @Override
78     public void remove(final YangInstanceIdentifier path) {
79         enqueueOperation(() -> netconfService.remove(CONFIGURATION, path));
80     }
81
82     @Override
83     public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
84         enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty()));
85     }
86
87     @Override
88     public void create(final YangInstanceIdentifier path, final NormalizedNode data,
89             final EffectiveModelContext schemaContext) {
90         if (data instanceof MapNode || data instanceof LeafSetNode) {
91             final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
92             merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree);
93
94             for (final NormalizedNode child : ((NormalizedNodeContainer<?>) data).body()) {
95                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
96                 enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty()));
97             }
98         } else {
99             enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty()));
100         }
101     }
102
103     @Override
104     public void replace(final YangInstanceIdentifier path, final NormalizedNode data,
105             final EffectiveModelContext schemaContext) {
106         if (data instanceof MapNode || data instanceof LeafSetNode) {
107             final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
108             merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree);
109
110             for (final NormalizedNode child : ((NormalizedNodeContainer<?>) data).body()) {
111                 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
112                 enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty()));
113             }
114         } else {
115             enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty()));
116         }
117     }
118
119     @Override
120     public FluentFuture<? extends @NonNull CommitInfo> commit() {
121         final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
122
123         // First complete all resultsFutures and merge them ...
124         final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
125
126         // ... then evaluate if there are any problems
127         Futures.addCallback(resultErrors, new FutureCallback<>() {
128             @Override
129             public void onSuccess(final DOMRpcResult result) {
130                 final Collection<? extends RpcError> errors = result.getErrors();
131                 if (!allWarnings(errors)) {
132                     Futures.whenAllComplete(discardAndUnlock()).run(
133                         () -> commitResult.setException(toCommitFailedException(errors)),
134                         MoreExecutors.directExecutor());
135                     return;
136                 }
137
138                 // ... no problems so far, initiate commit
139                 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
140                     @Override
141                     public void onSuccess(final DOMRpcResult rpcResult) {
142                         final Collection<? extends RpcError> errors = rpcResult.getErrors();
143                         if (errors.isEmpty()) {
144                             Futures.whenAllComplete(netconfService.unlock()).run(
145                                 () -> commitResult.set(CommitInfo.empty()),
146                                 MoreExecutors.directExecutor());
147                         } else if (allWarnings(errors)) {
148                             LOG.info("Commit successful with warnings {}", errors);
149                             Futures.whenAllComplete(netconfService.unlock()).run(
150                                 () -> commitResult.set(CommitInfo.empty()),
151                                 MoreExecutors.directExecutor());
152                         } else {
153                             Futures.whenAllComplete(discardAndUnlock()).run(
154                                 () -> commitResult.setException(toCommitFailedException(errors)),
155                                 MoreExecutors.directExecutor());
156                         }
157                     }
158
159                     @Override
160                     public void onFailure(final Throwable throwable) {
161                         Futures.whenAllComplete(discardAndUnlock()).run(
162                             () -> commitResult.setException(throwable),
163                             MoreExecutors.directExecutor());
164                     }
165                 }, MoreExecutors.directExecutor());
166             }
167
168             @Override
169             public void onFailure(final Throwable throwable) {
170                 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
171                     MoreExecutors.directExecutor());
172             }
173         }, MoreExecutors.directExecutor());
174
175         return FluentFuture.from(commitResult);
176     }
177
178     private List<ListenableFuture<?>> discardAndUnlock() {
179         // execute discard & unlock operations only if lock operation was completed successfully
180         if (isLocked) {
181             return List.of(netconfService.discardChanges(), netconfService.unlock());
182         } else {
183             return Collections.emptyList();
184         }
185     }
186
187     private final FutureCallback<DOMRpcResult> lockOperationCallback = new FutureCallback<>() {
188         @Override
189         public void onSuccess(final DOMRpcResult rpcResult) {
190             if (rpcResult != null && allWarnings(rpcResult.getErrors())) {
191                 isLocked = true;
192             }
193         }
194
195         @Override
196         public void onFailure(final Throwable throwable) {
197             // do nothing
198         }
199     };
200
201     private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
202         final ListenableFuture<? extends DOMRpcResult> operationFuture;
203         synchronized (resultsFutures) {
204             // if we only have result for the lock operation ...
205             if (resultsFutures.size() == 1) {
206                 operationFuture = Futures.transformAsync(resultsFutures.get(0),
207                     result -> {
208                         // ... then add new operation to the chain if lock was successful
209                         if (result != null && (result.getErrors().isEmpty() || allWarnings(result.getErrors()))) {
210                             return operation.get();
211                         } else {
212                             return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
213                                 ErrorType.APPLICATION, ErrorTag.LOCK_DENIED, ErrorSeverity.ERROR));
214                         }
215                     },
216                     MoreExecutors.directExecutor());
217             } else {
218                 // ... otherwise just add operation to the execution chain
219                 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
220                     future -> operation.get(),
221                     MoreExecutors.directExecutor());
222             }
223             // ... finally save operation related future to the list
224             resultsFutures.add(operationFuture);
225         }
226     }
227
228     // Transform list of futures related to RPC operation into a single Future
229     private static ListenableFuture<DOMRpcResult> mergeFutures(
230         final List<ListenableFuture<? extends DOMRpcResult>> futures) {
231         return Futures.whenAllComplete(futures).call(() -> {
232             if (futures.size() == 1) {
233                 // Fast path
234                 return Futures.getDone(futures.get(0));
235             }
236
237             final var builder = ImmutableList.<RpcError>builder();
238             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
239                 builder.addAll(Futures.getDone(future).getErrors());
240             }
241             return new DefaultDOMRpcResult(null, builder.build());
242         }, MoreExecutors.directExecutor());
243     }
244
245     private static TransactionCommitFailedException toCommitFailedException(
246             final Collection<? extends RpcError> errors) {
247         ErrorType errType = ErrorType.APPLICATION;
248         ErrorSeverity errSeverity = ErrorSeverity.ERROR;
249         StringJoiner msgBuilder = new StringJoiner(" ");
250         ErrorTag errorTag = ErrorTag.OPERATION_FAILED;
251         for (final RpcError error : errors) {
252             errType = error.getErrorType();
253             errSeverity = error.getSeverity();
254             msgBuilder.add(error.getMessage());
255             msgBuilder.add(error.getInfo());
256             errorTag = error.getTag();
257         }
258
259         return new TransactionCommitFailedException("Netconf transaction commit failed",
260             new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType, errorTag,
261                 errSeverity));
262     }
263
264     private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
265         final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
266         Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
267             @Override
268             public void onSuccess(final DOMRpcResult rpcResult) {
269                 if (rpcResult != null && !rpcResult.getErrors().isEmpty()) {
270                     LOG.error("Errors occurred during processing of the RPC operation: {}",
271                         rpcResult.getErrors().stream().map(Object::toString).collect(Collectors.joining(",")));
272                 }
273             }
274
275             @Override
276             public void onFailure(final Throwable throwable) {
277                 LOG.error("Error processing operation", throwable);
278             }
279         }, MoreExecutors.directExecutor());
280     }
281
282     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
283         return errors.stream().allMatch(error -> error.getSeverity() == ErrorSeverity.WARNING);
284     }
285 }