2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.StringJoiner;
27 import java.util.function.Supplier;
28 import java.util.stream.Collectors;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
32 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
33 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
34 import org.opendaylight.netconf.api.DocumentedException;
35 import org.opendaylight.netconf.api.NetconfDocumentedException;
36 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
37 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
38 import org.opendaylight.yangtools.yang.common.ErrorType;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 final class NetconfRestconfTransaction extends RestconfTransaction {
52 private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
54 private final NetconfDataTreeService netconfService;
55 private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
56 Collections.synchronizedList(new ArrayList<>());
57 private volatile boolean isLocked = false;
59 NetconfRestconfTransaction(final NetconfDataTreeService netconfService) {
60 this.netconfService = requireNonNull(netconfService);
61 final ListenableFuture<? extends DOMRpcResult> lockResult = netconfService.lock();
62 Futures.addCallback(lockResult, lockOperationCallback, MoreExecutors.directExecutor());
63 resultsFutures.add(lockResult);
67 public void cancel() {
68 resultsFutures.clear();
69 executeWithLogging(netconfService::discardChanges);
70 executeWithLogging(netconfService::unlock);
74 public void delete(final YangInstanceIdentifier path) {
75 enqueueOperation(() -> netconfService.delete(CONFIGURATION, path));
79 public void remove(final YangInstanceIdentifier path) {
80 enqueueOperation(() -> netconfService.remove(CONFIGURATION, path));
84 public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
85 enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty()));
89 public void create(final YangInstanceIdentifier path, final NormalizedNode data,
90 final SchemaContext schemaContext) {
91 if (data instanceof MapNode || data instanceof LeafSetNode) {
92 final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
93 merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree);
95 for (final NormalizedNode child : ((NormalizedNodeContainer<?>) data).body()) {
96 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
97 enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty()));
100 enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty()));
105 public void replace(final YangInstanceIdentifier path, final NormalizedNode data,
106 final SchemaContext schemaContext) {
107 if (data instanceof MapNode || data instanceof LeafSetNode) {
108 final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
109 merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree);
111 for (final NormalizedNode child : ((NormalizedNodeContainer<?>) data).body()) {
112 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
113 enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty()));
116 enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty()));
121 public FluentFuture<? extends @NonNull CommitInfo> commit() {
122 final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
124 // First complete all resultsFutures and merge them ...
125 final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
127 // ... then evaluate if there are any problems
128 Futures.addCallback(resultErrors, new FutureCallback<>() {
130 public void onSuccess(final DOMRpcResult result) {
131 final Collection<? extends RpcError> errors = result.getErrors();
132 if (!allWarnings(errors)) {
133 Futures.whenAllComplete(discardAndUnlock()).run(
134 () -> commitResult.setException(toCommitFailedException(errors)),
135 MoreExecutors.directExecutor());
139 // ... no problems so far, initiate commit
140 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
142 public void onSuccess(final DOMRpcResult rpcResult) {
143 final Collection<? extends RpcError> errors = result.getErrors();
144 if (errors.isEmpty()) {
145 Futures.whenAllComplete(netconfService.unlock()).run(
146 () -> commitResult.set(CommitInfo.empty()),
147 MoreExecutors.directExecutor());
148 } else if (allWarnings(errors)) {
149 LOG.info("Commit successful with warnings {}", errors);
150 Futures.whenAllComplete(netconfService.unlock()).run(
151 () -> commitResult.set(CommitInfo.empty()),
152 MoreExecutors.directExecutor());
154 Futures.whenAllComplete(discardAndUnlock()).run(
155 () -> commitResult.setException(toCommitFailedException(errors)),
156 MoreExecutors.directExecutor());
161 public void onFailure(final Throwable throwable) {
162 Futures.whenAllComplete(discardAndUnlock()).run(
163 () -> commitResult.setException(throwable),
164 MoreExecutors.directExecutor());
166 }, MoreExecutors.directExecutor());
170 public void onFailure(final Throwable throwable) {
171 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
172 MoreExecutors.directExecutor());
174 }, MoreExecutors.directExecutor());
176 return FluentFuture.from(commitResult);
179 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
180 justification = "https://github.com/spotbugs/spotbugs/issues/811")
181 private List<ListenableFuture<?>> discardAndUnlock() {
182 // execute discard & unlock operations only if lock operation was completed successfully
184 return List.of(netconfService.discardChanges(), netconfService.unlock());
186 return Collections.emptyList();
190 private final FutureCallback<DOMRpcResult> lockOperationCallback = new FutureCallback<>() {
192 public void onSuccess(final DOMRpcResult rpcResult) {
193 if (rpcResult != null && allWarnings(rpcResult.getErrors())) {
199 public void onFailure(final Throwable throwable) {
204 private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
205 final ListenableFuture<? extends DOMRpcResult> operationFuture;
206 synchronized (resultsFutures) {
207 // if we only have result for the lock operation ...
208 if (resultsFutures.size() == 1) {
209 operationFuture = Futures.transformAsync(resultsFutures.get(0),
211 // ... then add new operation to the chain if lock was successful
212 if (result != null && (result.getErrors().isEmpty() || allWarnings(result.getErrors()))) {
213 return operation.get();
215 return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
216 ErrorType.APPLICATION, DocumentedException.ErrorTag.LOCK_DENIED, ErrorSeverity.ERROR));
219 MoreExecutors.directExecutor());
221 // ... otherwise just add operation to the execution chain
222 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
223 future -> operation.get(),
224 MoreExecutors.directExecutor());
226 // ... finally save operation related future to the list
227 resultsFutures.add(operationFuture);
231 // Transform list of futures related to RPC operation into a single Future
232 private static ListenableFuture<DOMRpcResult> mergeFutures(
233 final List<ListenableFuture<? extends DOMRpcResult>> futures) {
234 return Futures.whenAllComplete(futures).call(() -> {
235 if (futures.size() == 1) {
237 return Futures.getDone(futures.get(0));
240 final var builder = ImmutableList.<RpcError>builder();
241 for (ListenableFuture<? extends DOMRpcResult> future : futures) {
242 builder.addAll(Futures.getDone(future).getErrors());
244 return new DefaultDOMRpcResult(null, builder.build());
245 }, MoreExecutors.directExecutor());
248 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
249 justification = "https://github.com/spotbugs/spotbugs/issues/811")
250 private static TransactionCommitFailedException toCommitFailedException(
251 final Collection<? extends RpcError> errors) {
252 ErrorType errType = ErrorType.APPLICATION;
253 ErrorSeverity errSeverity = ErrorSeverity.ERROR;
254 StringJoiner msgBuilder = new StringJoiner(" ");
255 String errorTag = "operation-failed";
256 for (final RpcError error : errors) {
257 errType = error.getErrorType().toNetconf();
258 errSeverity = error.getSeverity().toNetconf();
259 msgBuilder.add(error.getMessage());
260 msgBuilder.add(error.getInfo());
261 errorTag = error.getTag();
264 return new TransactionCommitFailedException("Netconf transaction commit failed",
265 new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType,
266 DocumentedException.ErrorTag.from(errorTag), errSeverity));
269 private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
270 final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
271 Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
273 public void onSuccess(final DOMRpcResult rpcResult) {
274 if (rpcResult != null && !rpcResult.getErrors().isEmpty()) {
275 LOG.error("Errors occurred during processing of the RPC operation: {}",
276 rpcResult.getErrors().stream().map(Object::toString).collect(Collectors.joining(",")));
281 public void onFailure(final Throwable throwable) {
282 LOG.error("Error processing operation", throwable);
284 }, MoreExecutors.directExecutor());
287 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
288 justification = "https://github.com/spotbugs/spotbugs/issues/811")
289 private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
290 return errors.stream().allMatch(error -> error.getSeverity() == RpcError.ErrorSeverity.WARNING);