Eliminate FormatParameters
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / transactions / NetconfRestconfTransaction.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
12 import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.fromInstanceId;
13
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.StringJoiner;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.function.Supplier;
29 import java.util.stream.Collectors;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.opendaylight.mdsal.common.api.CommitInfo;
33 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
34 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
35 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
36 import org.opendaylight.netconf.api.NetconfDocumentedException;
37 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
38 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
39 import org.opendaylight.yangtools.yang.common.ErrorTag;
40 import org.opendaylight.yangtools.yang.common.ErrorType;
41 import org.opendaylight.yangtools.yang.common.RpcError;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
44 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
48 import org.opendaylight.yangtools.yang.data.util.DataSchemaContext;
49 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
50 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
51 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
52 import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
53 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 final class NetconfRestconfTransaction extends RestconfTransaction {
58     private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
59
60     private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
61         Collections.synchronizedList(new ArrayList<>());
62     private final NetconfDataTreeService netconfService;
63     private final Map<YangInstanceIdentifier, Collection<? extends NormalizedNode>> readListCache =
64         new ConcurrentHashMap<>();
65
66     private volatile boolean isLocked = false;
67
68     NetconfRestconfTransaction(final EffectiveModelContext modelContext, final NetconfDataTreeService netconfService) {
69         super(modelContext);
70         this.netconfService = requireNonNull(netconfService);
71
72         final var lockResult = netconfService.lock();
73         Futures.addCallback(lockResult, new FutureCallback<DOMRpcResult>() {
74             @Override
75             public void onSuccess(final DOMRpcResult rpcResult) {
76                 if (rpcResult != null && allWarnings(rpcResult.errors())) {
77                     isLocked = true;
78                 }
79             }
80
81             @Override
82             public void onFailure(final Throwable throwable) {
83                 // do nothing
84             }
85         }, MoreExecutors.directExecutor());
86         resultsFutures.add(lockResult);
87     }
88
89     @Override
90     void cancel() {
91         resultsFutures.clear();
92         readListCache.clear();
93         executeWithLogging(netconfService::discardChanges);
94         executeWithLogging(netconfService::unlock);
95     }
96
97     @Override
98     void deleteImpl(final YangInstanceIdentifier path) {
99         if (isListPath(path, modelContext)) {
100             final var items = getListItemsForRemove(path);
101             if (items.isEmpty()) {
102                 LOG.debug("Path {} contains no items, delete operation omitted.", path);
103             } else {
104                 items.forEach(item ->
105                     enqueueOperation(() -> netconfService.delete(CONFIGURATION, path.node(item.name()))));
106             }
107         } else {
108             enqueueOperation(() -> netconfService.delete(CONFIGURATION, path));
109         }
110     }
111
112     @Override
113     void removeImpl(final YangInstanceIdentifier path) {
114         if (isListPath(path, modelContext)) {
115             final var items = getListItemsForRemove(path);
116             if (items.isEmpty()) {
117                 LOG.debug("Path {} contains no items, remove operation omitted.", path);
118             } else {
119                 items.forEach(item ->
120                     enqueueOperation(() -> netconfService.remove(CONFIGURATION, path.node(item.name()))));
121             }
122         } else {
123             enqueueOperation(() -> netconfService.remove(CONFIGURATION, path));
124         }
125     }
126
127     @Override
128     @Nullable NormalizedNodeContainer<?> readList(final YangInstanceIdentifier path) {
129         // reading list is mainly invoked for subsequent removal,
130         // cache data to avoid extra read invocation on delete/remove
131         final var result =  TransactionUtil.syncAccess(read(path), path);
132         readListCache.put(path, result.map(data -> ((NormalizedNodeContainer<?>) data).body()).orElse(List.of()));
133         return (NormalizedNodeContainer<?>) result.orElse(null);
134     }
135
136     private @NonNull Collection<? extends NormalizedNode> getListItemsForRemove(final YangInstanceIdentifier path) {
137         final var cached = readListCache.remove(path);
138         if (cached != null) {
139             return cached;
140         }
141         // check if keys only can be filtered out to minimize amount of data retrieved
142         final var keyFields = keyFieldsFrom(path, modelContext);
143         final var future =  keyFields.isEmpty() ? netconfService.getConfig(path)
144             // using list wildcard as a root path, it's required for proper key field path construction
145             // on building get-config filter
146             : netconfService.getConfig(
147                 path.node(NodeIdentifierWithPredicates.of(path.getLastPathArgument().getNodeType())), keyFields);
148         final var retrieved = TransactionUtil.syncAccess(future, path);
149         return retrieved.map(data -> ((NormalizedNodeContainer<?>) data).body()).orElse(List.of());
150     }
151
152     @Override
153     void mergeImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
154         enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty()));
155     }
156
157     @Override
158     void createImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
159         if (data instanceof MapNode || data instanceof LeafSetNode) {
160             final var emptySubTree = fromInstanceId(modelContext, path);
161             merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
162
163             for (var child : ((NormalizedNodeContainer<?>) data).body()) {
164                 final var childPath = path.node(child.name());
165                 enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty()));
166             }
167         } else {
168             enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty()));
169         }
170     }
171
172     @Override
173     void replaceImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
174         if (data instanceof MapNode || data instanceof LeafSetNode) {
175             final var emptySubTree = fromInstanceId(modelContext, path);
176             merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
177
178             for (var child : ((NormalizedNodeContainer<?>) data).body()) {
179                 final var childPath = path.node(child.name());
180                 enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty()));
181             }
182         } else {
183             enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty()));
184         }
185     }
186
187     @Override
188     ListenableFuture<? extends @NonNull CommitInfo> commit() {
189         final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
190
191         // First complete all resultsFutures and merge them ...
192         final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
193
194         // ... then evaluate if there are any problems
195         Futures.addCallback(resultErrors, new FutureCallback<>() {
196             @Override
197             public void onSuccess(final DOMRpcResult result) {
198                 final Collection<? extends RpcError> errors = result.errors();
199                 if (!allWarnings(errors)) {
200                     Futures.whenAllComplete(discardAndUnlock()).run(
201                         () -> commitResult.setException(toCommitFailedException(errors)),
202                         MoreExecutors.directExecutor());
203                     return;
204                 }
205
206                 // ... no problems so far, initiate commit
207                 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
208                     @Override
209                     public void onSuccess(final DOMRpcResult rpcResult) {
210                         final Collection<? extends RpcError> errors = rpcResult.errors();
211                         if (errors.isEmpty()) {
212                             Futures.whenAllComplete(netconfService.unlock()).run(
213                                 () -> commitResult.set(CommitInfo.empty()),
214                                 MoreExecutors.directExecutor());
215                         } else if (allWarnings(errors)) {
216                             LOG.info("Commit successful with warnings {}", errors);
217                             Futures.whenAllComplete(netconfService.unlock()).run(
218                                 () -> commitResult.set(CommitInfo.empty()),
219                                 MoreExecutors.directExecutor());
220                         } else {
221                             Futures.whenAllComplete(discardAndUnlock()).run(
222                                 () -> commitResult.setException(toCommitFailedException(errors)),
223                                 MoreExecutors.directExecutor());
224                         }
225                     }
226
227                     @Override
228                     public void onFailure(final Throwable throwable) {
229                         Futures.whenAllComplete(discardAndUnlock()).run(
230                             () -> commitResult.setException(throwable),
231                             MoreExecutors.directExecutor());
232                     }
233                 }, MoreExecutors.directExecutor());
234             }
235
236             @Override
237             public void onFailure(final Throwable throwable) {
238                 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
239                     MoreExecutors.directExecutor());
240             }
241         }, MoreExecutors.directExecutor());
242
243         return commitResult;
244     }
245
246     @Override
247     ListenableFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
248         return netconfService.getConfig(path);
249     }
250
251     private List<ListenableFuture<?>> discardAndUnlock() {
252         readListCache.clear();
253         // execute discard & unlock operations only if lock operation was completed successfully
254         if (isLocked) {
255             return List.of(netconfService.discardChanges(), netconfService.unlock());
256         } else {
257             return List.of();
258         }
259     }
260
261     private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
262         final ListenableFuture<? extends DOMRpcResult> operationFuture;
263         synchronized (resultsFutures) {
264             // if we only have result for the lock operation ...
265             if (resultsFutures.size() == 1) {
266                 operationFuture = Futures.transformAsync(resultsFutures.get(0),
267                     result -> {
268                         // ... then add new operation to the chain if lock was successful
269                         if (result != null && (result.errors().isEmpty() || allWarnings(result.errors()))) {
270                             return operation.get();
271                         } else {
272                             return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
273                                 ErrorType.APPLICATION, ErrorTag.LOCK_DENIED, ErrorSeverity.ERROR));
274                         }
275                     },
276                     MoreExecutors.directExecutor());
277             } else {
278                 // ... otherwise just add operation to the execution chain
279                 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
280                     future -> operation.get(),
281                     MoreExecutors.directExecutor());
282             }
283             // ... finally save operation related future to the list
284             resultsFutures.add(operationFuture);
285         }
286     }
287
288     // Transform list of futures related to RPC operation into a single Future
289     private static ListenableFuture<DOMRpcResult> mergeFutures(
290             final List<ListenableFuture<? extends DOMRpcResult>> futures) {
291         return Futures.whenAllComplete(futures).call(() -> {
292             if (futures.size() == 1) {
293                 // Fast path
294                 return Futures.getDone(futures.get(0));
295             }
296
297             final var builder = ImmutableList.<RpcError>builder();
298             for (ListenableFuture<? extends DOMRpcResult> future : futures) {
299                 builder.addAll(Futures.getDone(future).errors());
300             }
301             return new DefaultDOMRpcResult(null, builder.build());
302         }, MoreExecutors.directExecutor());
303     }
304
305     private static TransactionCommitFailedException toCommitFailedException(
306             final Collection<? extends RpcError> errors) {
307         ErrorType errType = ErrorType.APPLICATION;
308         ErrorSeverity errSeverity = ErrorSeverity.ERROR;
309         StringJoiner msgBuilder = new StringJoiner(" ");
310         ErrorTag errorTag = ErrorTag.OPERATION_FAILED;
311         for (final RpcError error : errors) {
312             errType = error.getErrorType();
313             errSeverity = error.getSeverity();
314             msgBuilder.add(error.getMessage());
315             msgBuilder.add(error.getInfo());
316             errorTag = error.getTag();
317         }
318
319         return new TransactionCommitFailedException("Netconf transaction commit failed",
320             new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType, errorTag,
321                 errSeverity));
322     }
323
324     private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
325         final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
326         Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
327             @Override
328             public void onSuccess(final DOMRpcResult rpcResult) {
329                 if (rpcResult != null && !rpcResult.errors().isEmpty()) {
330                     LOG.error("Errors occurred during processing of the RPC operation: {}",
331                         rpcResult.errors().stream().map(Object::toString).collect(Collectors.joining(",")));
332                 }
333             }
334
335             @Override
336             public void onFailure(final Throwable throwable) {
337                 LOG.error("Error processing operation", throwable);
338             }
339         }, MoreExecutors.directExecutor());
340     }
341
342     private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
343         return errors.stream().allMatch(error -> error.getSeverity() == ErrorSeverity.WARNING);
344     }
345
346     private static boolean isListPath(final YangInstanceIdentifier path, final EffectiveModelContext modelContext) {
347         if (path.getLastPathArgument() instanceof YangInstanceIdentifier.NodeIdentifier) {
348             // list can be referenced by NodeIdentifier only, prevent list item do be identified as list
349             final var schemaNode = schemaNodeFrom(path, modelContext);
350             return schemaNode instanceof ListSchemaNode || schemaNode instanceof LeafListSchemaNode;
351         }
352         return false;
353     }
354
355     private static List<YangInstanceIdentifier> keyFieldsFrom(final YangInstanceIdentifier path,
356             final EffectiveModelContext modelContext) {
357         final var schemaNode = schemaNodeFrom(path, modelContext);
358         return schemaNode instanceof ListSchemaNode listSchemaNode
359             ? listSchemaNode.getKeyDefinition().stream().map(YangInstanceIdentifier::of).toList() : List.of();
360     }
361
362     private static DataSchemaNode schemaNodeFrom(final YangInstanceIdentifier path,
363             final EffectiveModelContext modelContext) {
364         return DataSchemaContextTree.from(modelContext).findChild(path)
365             .map(DataSchemaContext::dataSchemaNode).orElse(null);
366     }
367 }