2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.StringJoiner;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
32 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
33 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
34 import org.opendaylight.netconf.api.DocumentedException;
35 import org.opendaylight.netconf.api.NetconfDocumentedException;
36 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
37 import org.opendaylight.yangtools.yang.common.RpcError;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 final class NetconfRestconfTransaction extends RestconfTransaction {
50 private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
52 private final NetconfDataTreeService netconfService;
53 private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
54 Collections.synchronizedList(new ArrayList<>());
55 private volatile boolean isLocked = false;
57 NetconfRestconfTransaction(final NetconfDataTreeService netconfService) {
58 this.netconfService = requireNonNull(netconfService);
59 final ListenableFuture<? extends DOMRpcResult> lockResult = netconfService.lock();
60 Futures.addCallback(lockResult, lockOperationCallback, MoreExecutors.directExecutor());
61 resultsFutures.add(lockResult);
65 public void cancel() {
66 resultsFutures.clear();
67 executeWithLogging(netconfService::discardChanges);
68 executeWithLogging(netconfService::unlock);
72 public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
73 enqueueOperation(() -> netconfService.delete(store, path));
77 public void remove(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
78 enqueueOperation(() -> netconfService.remove(store, path));
82 public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
83 final NormalizedNode<?, ?> data) {
84 enqueueOperation(() -> netconfService.merge(store, path, data, Optional.empty()));
88 public void create(final LogicalDatastoreType store, final YangInstanceIdentifier path,
89 final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
90 if (data instanceof MapNode || data instanceof LeafSetNode) {
91 final NormalizedNode<?, ?> emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
92 merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()),
95 for (final NormalizedNode<?, ?> child : ((NormalizedNodeContainer<?, ?, ?>) data).getValue()) {
96 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
97 enqueueOperation(() -> netconfService.create(store, childPath, child, Optional.empty()));
100 enqueueOperation(() -> netconfService.create(store, path, data, Optional.empty()));
105 public void replace(final LogicalDatastoreType store, final YangInstanceIdentifier path,
106 final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
107 if (data instanceof MapNode || data instanceof LeafSetNode) {
108 final NormalizedNode<?, ?> emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path);
109 merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()),
112 for (final NormalizedNode<?, ?> child : ((NormalizedNodeContainer<?, ?, ?>) data).getValue()) {
113 final YangInstanceIdentifier childPath = path.node(child.getIdentifier());
114 enqueueOperation(() -> netconfService.replace(store, childPath, child, Optional.empty()));
117 enqueueOperation(() -> netconfService.replace(store, path, data, Optional.empty()));
122 public FluentFuture<? extends @NonNull CommitInfo> commit() {
123 final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
125 // First complete all resultsFutures and merge them ...
126 final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
128 // ... then evaluate if there are any problems
129 Futures.addCallback(resultErrors, new FutureCallback<>() {
131 public void onSuccess(final DOMRpcResult result) {
132 final Collection<? extends RpcError> errors = result.getErrors();
133 if (!allWarnings(errors)) {
134 Futures.whenAllComplete(discardAndUnlock()).run(
135 () -> commitResult.setException(toCommitFailedException(errors)),
136 MoreExecutors.directExecutor());
140 // ... no problems so far, initiate commit
141 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
143 public void onSuccess(final DOMRpcResult rpcResult) {
144 final Collection<? extends RpcError> errors = result.getErrors();
145 if (errors.isEmpty()) {
146 Futures.whenAllComplete(netconfService.unlock()).run(
147 () -> commitResult.set(CommitInfo.empty()),
148 MoreExecutors.directExecutor());
149 } else if (allWarnings(errors)) {
150 LOG.info("Commit successful with warnings {}", errors);
151 Futures.whenAllComplete(netconfService.unlock()).run(
152 () -> commitResult.set(CommitInfo.empty()),
153 MoreExecutors.directExecutor());
155 Futures.whenAllComplete(discardAndUnlock()).run(
156 () -> commitResult.setException(toCommitFailedException(errors)),
157 MoreExecutors.directExecutor());
162 public void onFailure(final Throwable throwable) {
163 Futures.whenAllComplete(discardAndUnlock()).run(
164 () -> commitResult.setException(throwable),
165 MoreExecutors.directExecutor());
167 }, MoreExecutors.directExecutor());
171 public void onFailure(final Throwable throwable) {
172 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
173 MoreExecutors.directExecutor());
175 }, MoreExecutors.directExecutor());
177 return FluentFuture.from(commitResult);
180 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
181 justification = "https://github.com/spotbugs/spotbugs/issues/811")
182 private List<ListenableFuture<?>> discardAndUnlock() {
183 // execute discard & unlock operations only if lock operation was completed successfully
185 return List.of(netconfService.discardChanges(), netconfService.unlock());
187 return Collections.emptyList();
191 private final FutureCallback<DOMRpcResult> lockOperationCallback = new FutureCallback<>() {
193 public void onSuccess(final DOMRpcResult rpcResult) {
194 if (rpcResult != null && allWarnings(rpcResult.getErrors())) {
200 public void onFailure(final Throwable throwable) {
205 private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
206 final ListenableFuture<? extends DOMRpcResult> operationFuture;
207 synchronized (resultsFutures) {
208 // if we only have result for the lock operation ...
209 if (resultsFutures.size() == 1) {
210 operationFuture = Futures.transformAsync(resultsFutures.get(0),
212 // ... then add new operation to the chain if lock was successful
213 if (result != null && (result.getErrors().isEmpty() || allWarnings(result.getErrors()))) {
214 return operation.get();
216 return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
217 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.LOCK_DENIED,
218 DocumentedException.ErrorSeverity.ERROR));
221 MoreExecutors.directExecutor());
223 // ... otherwise just add operation to the execution chain
224 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
225 future -> operation.get(),
226 MoreExecutors.directExecutor());
228 // ... finally save operation related future to the list
229 resultsFutures.add(operationFuture);
233 // Transform list of futures related to RPC operation into a single Future
234 private static ListenableFuture<DOMRpcResult> mergeFutures(
235 final List<ListenableFuture<? extends DOMRpcResult>> futures) {
236 return Futures.whenAllComplete(futures).call(() -> {
237 if (futures.size() == 1) {
239 return Futures.getDone(futures.get(0));
242 final var builder = ImmutableList.<RpcError>builder();
243 for (ListenableFuture<? extends DOMRpcResult> future : futures) {
244 builder.addAll(Futures.getDone(future).getErrors());
246 return new DefaultDOMRpcResult(null, builder.build());
247 }, MoreExecutors.directExecutor());
250 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
251 justification = "https://github.com/spotbugs/spotbugs/issues/811")
252 private static TransactionCommitFailedException toCommitFailedException(
253 final Collection<? extends RpcError> errors) {
254 DocumentedException.ErrorType errType = DocumentedException.ErrorType.APPLICATION;
255 DocumentedException.ErrorSeverity errSeverity = DocumentedException.ErrorSeverity.ERROR;
256 StringJoiner msgBuilder = new StringJoiner(" ");
257 String errorTag = "operation-failed";
258 for (final RpcError error : errors) {
259 switch (error.getErrorType()) {
261 errType = DocumentedException.ErrorType.RPC;
264 errType = DocumentedException.ErrorType.PROTOCOL;
267 errType = DocumentedException.ErrorType.TRANSPORT;
271 errType = DocumentedException.ErrorType.APPLICATION;
274 switch (error.getSeverity()) {
276 errSeverity = DocumentedException.ErrorSeverity.WARNING;
280 errSeverity = DocumentedException.ErrorSeverity.ERROR;
283 msgBuilder.add(error.getMessage());
284 msgBuilder.add(error.getInfo());
285 errorTag = error.getTag();
288 return new TransactionCommitFailedException("Netconf transaction commit failed",
289 new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType,
290 DocumentedException.ErrorTag.from(errorTag), errSeverity));
293 private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
294 final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
295 Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
297 public void onSuccess(final DOMRpcResult rpcResult) {
298 if (rpcResult != null && !rpcResult.getErrors().isEmpty()) {
299 LOG.error("Errors occurred during processing of the RPC operation: {}",
300 rpcResult.getErrors().stream().map(Object::toString).collect(Collectors.joining(",")));
305 public void onFailure(final Throwable throwable) {
306 LOG.error("Error processing operation", throwable);
308 }, MoreExecutors.directExecutor());
311 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
312 justification = "https://github.com/spotbugs/spotbugs/issues/811")
313 private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
314 return errors.stream().allMatch(error -> error.getSeverity() == RpcError.ErrorSeverity.WARNING);