Further warnings mitigation
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / transactions / NetconfRestconfTransaction.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
12 import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.fromInstanceId;
13
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.StringJoiner;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
31 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
32 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
33 import org.opendaylight.netconf.api.NetconfDocumentedException;
34 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
35 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
36 import org.opendaylight.yangtools.yang.common.ErrorTag;
37 import org.opendaylight.yangtools.yang.common.ErrorType;
38 import org.opendaylight.yangtools.yang.common.RpcError;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
44 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 final class NetconfRestconfTransaction extends RestconfTransaction {
49     private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
50
51     private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
52         Collections.synchronizedList(new ArrayList<>());
53     private final NetconfDataTreeService netconfService;
54
55     private volatile boolean isLocked = false;
56
57     NetconfRestconfTransaction(final EffectiveModelContext modelContext, final NetconfDataTreeService netconfService) {
58         super(modelContext);
59         this.netconfService = requireNonNull(netconfService);
60
61         final var lockResult = netconfService.lock();
62         Futures.addCallback(lockResult, new FutureCallback<DOMRpcResult>() {
63             @Override
64             public void onSuccess(final DOMRpcResult rpcResult) {
65                 if (rpcResult != null && allWarnings(rpcResult.errors())) {
66                     isLocked = true;
67                 }
68             }
69
70             @Override
71             public void onFailure(final Throwable throwable) {
72                 // do nothing
73             }
74         }, MoreExecutors.directExecutor());
75         resultsFutures.add(lockResult);
76     }
77
78     @Override
79     void cancel() {
80         resultsFutures.clear();
81         executeWithLogging(netconfService::discardChanges);
82         executeWithLogging(netconfService::unlock);
83     }
84
85     @Override
86     void deleteImpl(final YangInstanceIdentifier path) {
87         enqueueOperation(() -> netconfService.delete(CONFIGURATION, path));
88     }
89
90     @Override
91     void removeImpl(final YangInstanceIdentifier path) {
92         enqueueOperation(() -> netconfService.remove(CONFIGURATION, path));
93     }
94
95     @Override
96     void mergeImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
97         enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty()));
98     }
99
100     @Override
101     void createImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
102         if (data instanceof MapNode || data instanceof LeafSetNode) {
103             final var emptySubTree = fromInstanceId(modelContext, path);
104             merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
105
106             for (var child : ((NormalizedNodeContainer<?>) data).body()) {
107                 final var childPath = path.node(child.name());
108                 enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty()));
109             }
110         } else {
111             enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty()));
112         }
113     }
114
115     @Override
116     void replaceImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
117         if (data instanceof MapNode || data instanceof LeafSetNode) {
118             final var emptySubTree = fromInstanceId(modelContext, path);
119             merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
120
121             for (var child : ((NormalizedNodeContainer<?>) data).body()) {
122                 final var childPath = path.node(child.name());
123                 enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty()));
124             }
125         } else {
126             enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty()));
127         }
128     }
129
130     @Override
131     ListenableFuture<? extends @NonNull CommitInfo> commit() {
132         final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
133
134         // First complete all resultsFutures and merge them ...
135         final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
136
137         // ... then evaluate if there are any problems
138         Futures.addCallback(resultErrors, new FutureCallback<>() {
139             @Override
140             public void onSuccess(final DOMRpcResult result) {
141                 final Collection<? extends RpcError> errors = result.errors();
142                 if (!allWarnings(errors)) {
143                     Futures.whenAllComplete(discardAndUnlock()).run(
144                         () -> commitResult.setException(toCommitFailedException(errors)),
145                         MoreExecutors.directExecutor());
146                     return;
147                 }
148
149                 // ... no problems so far, initiate commit
150                 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
151                     @Override
152                     public void onSuccess(final DOMRpcResult rpcResult) {
153                         final Collection<? extends RpcError> errors = rpcResult.errors();
154                         if (errors.isEmpty()) {
155                             Futures.whenAllComplete(netconfService.unlock()).run(
156                                 () -> commitResult.set(CommitInfo.empty()),
157                                 MoreExecutors.directExecutor());
158                         } else if (allWarnings(errors)) {
159                             LOG.info("Commit successful with warnings {}", errors);
160                             Futures.whenAllComplete(netconfService.unlock()).run(
161                                 () -> commitResult.set(CommitInfo.empty()),
162                                 MoreExecutors.directExecutor());
163                         } else {
164                             Futures.whenAllComplete(discardAndUnlock()).run(
165                                 () -> commitResult.setException(toCommitFailedException(errors)),
166                                 MoreExecutors.directExecutor());
167                         }
168                     }
169
170                     @Override
171                     public void onFailure(final Throwable throwable) {
172                         Futures.whenAllComplete(discardAndUnlock()).run(
173                             () -> commitResult.setException(throwable),
174                             MoreExecutors.directExecutor());
175                     }
176                 }, MoreExecutors.directExecutor());
177             }
178
179             @Override
180             public void onFailure(final Throwable throwable) {
181                 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
182                     MoreExecutors.directExecutor());
183             }
184         }, MoreExecutors.directExecutor());
185
186         return commitResult;
187     }
188
189     @Override
190     ListenableFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
191         return netconfService.getConfig(path);
192     }
193
194     private List<ListenableFuture<?>> discardAndUnlock() {
195         // execute discard & unlock operations only if lock operation was completed successfully
196         if (isLocked) {
197             return List.of(netconfService.discardChanges(), netconfService.unlock());
198         } else {
199             return List.of();
200         }
201     }
202
203     private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
204         final ListenableFuture<? extends DOMRpcResult> operationFuture;
205         synchronized (resultsFutures) {
206             // if we only have result for the lock operation ...
207             if (resultsFutures.size() == 1) {
208                 operationFuture = Futures.transformAsync(resultsFutures.get(0),
209                     result -> {
210                         // ... then add new operation to the chain if lock was successful
211                         if (result != null && (result.errors().isEmpty() || allWarnings(result.errors()))) {
212                             return operation.get();
213                         } else {
214                             return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
215                                 ErrorType.APPLICATION, ErrorTag.LOCK_DENIED, ErrorSeverity.ERROR));
216                         }
217                     },
218                     MoreExecutors.directExecutor());
219             } else {
220                 // ... otherwise just add operation to the execution chain
221                 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
222                     future -> operation.get(),
223                     MoreExecutors.directExecutor());
224             }
225             // ... finally save operation related future to the list
226             resultsFutures.add(operationFuture);
227         }
228     }
229
230     // Transform list of futures related to RPC operation into a single Future
231     private static ListenableFuture<DOMRpcResult> mergeFutures(
232             final List<ListenableFuture<? extends DOMRpcResult>> futures) {
233         return Futures.whenAllComplete(futures).call(() -> {
234             if (futures.size() == 1) {
235                 // Fast path
236                 return Futures.getDone(futures.get(0));
237             }
238
239             final var builder = ImmutableList.<RpcError>builder();
240             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
241                 builder.addAll(Futures.getDone(future).errors());
242             }
243             return new DefaultDOMRpcResult(null, builder.build());
244         }, MoreExecutors.directExecutor());
245     }
246
247     private static TransactionCommitFailedException toCommitFailedException(
248             final Collection<? extends RpcError> errors) {
249         ErrorType errType = ErrorType.APPLICATION;
250         ErrorSeverity errSeverity = ErrorSeverity.ERROR;
251         StringJoiner msgBuilder = new StringJoiner(" ");
252         ErrorTag errorTag = ErrorTag.OPERATION_FAILED;
253         for (final RpcError error : errors) {
254             errType = error.getErrorType();
255             errSeverity = error.getSeverity();
256             msgBuilder.add(error.getMessage());
257             msgBuilder.add(error.getInfo());
258             errorTag = error.getTag();
259         }
260
261         return new TransactionCommitFailedException("Netconf transaction commit failed",
262             new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType, errorTag,
263                 errSeverity));
264     }
265
266     private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
267         final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
268         Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
269             @Override
270             public void onSuccess(final DOMRpcResult rpcResult) {
271                 if (rpcResult != null && !rpcResult.errors().isEmpty()) {
272                     LOG.error("Errors occurred during processing of the RPC operation: {}",
273                         rpcResult.errors().stream().map(Object::toString).collect(Collectors.joining(",")));
274                 }
275             }
276
277             @Override
278             public void onFailure(final Throwable throwable) {
279                 LOG.error("Error processing operation", throwable);
280             }
281         }, MoreExecutors.directExecutor());
282     }
283
284     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
285         return errors.stream().allMatch(error -> error.getSeverity() == ErrorSeverity.WARNING);
286     }
287 }