2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
12 import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.fromInstanceId;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.StringJoiner;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
31 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
32 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
33 import org.opendaylight.netconf.api.NetconfDocumentedException;
34 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
35 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
36 import org.opendaylight.yangtools.yang.common.ErrorTag;
37 import org.opendaylight.yangtools.yang.common.ErrorType;
38 import org.opendaylight.yangtools.yang.common.RpcError;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
44 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 final class NetconfRestconfTransaction extends RestconfTransaction {
49 private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
51 private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
52 Collections.synchronizedList(new ArrayList<>());
53 private final NetconfDataTreeService netconfService;
55 private volatile boolean isLocked = false;
57 NetconfRestconfTransaction(final EffectiveModelContext modelContext, final NetconfDataTreeService netconfService) {
59 this.netconfService = requireNonNull(netconfService);
61 final var lockResult = netconfService.lock();
62 Futures.addCallback(lockResult, new FutureCallback<DOMRpcResult>() {
64 public void onSuccess(final DOMRpcResult rpcResult) {
65 if (rpcResult != null && allWarnings(rpcResult.errors())) {
71 public void onFailure(final Throwable throwable) {
74 }, MoreExecutors.directExecutor());
75 resultsFutures.add(lockResult);
80 resultsFutures.clear();
81 executeWithLogging(netconfService::discardChanges);
82 executeWithLogging(netconfService::unlock);
86 void deleteImpl(final YangInstanceIdentifier path) {
87 enqueueOperation(() -> netconfService.delete(CONFIGURATION, path));
91 void removeImpl(final YangInstanceIdentifier path) {
92 enqueueOperation(() -> netconfService.remove(CONFIGURATION, path));
96 void mergeImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
97 enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty()));
101 void createImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
102 if (data instanceof MapNode || data instanceof LeafSetNode) {
103 final var emptySubTree = fromInstanceId(modelContext, path);
104 merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
106 for (var child : ((NormalizedNodeContainer<?>) data).body()) {
107 final var childPath = path.node(child.name());
108 enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty()));
111 enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty()));
116 void replaceImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
117 if (data instanceof MapNode || data instanceof LeafSetNode) {
118 final var emptySubTree = fromInstanceId(modelContext, path);
119 merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
121 for (var child : ((NormalizedNodeContainer<?>) data).body()) {
122 final var childPath = path.node(child.name());
123 enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty()));
126 enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty()));
131 ListenableFuture<? extends @NonNull CommitInfo> commit() {
132 final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
134 // First complete all resultsFutures and merge them ...
135 final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
137 // ... then evaluate if there are any problems
138 Futures.addCallback(resultErrors, new FutureCallback<>() {
140 public void onSuccess(final DOMRpcResult result) {
141 final Collection<? extends RpcError> errors = result.errors();
142 if (!allWarnings(errors)) {
143 Futures.whenAllComplete(discardAndUnlock()).run(
144 () -> commitResult.setException(toCommitFailedException(errors)),
145 MoreExecutors.directExecutor());
149 // ... no problems so far, initiate commit
150 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
152 public void onSuccess(final DOMRpcResult rpcResult) {
153 final Collection<? extends RpcError> errors = rpcResult.errors();
154 if (errors.isEmpty()) {
155 Futures.whenAllComplete(netconfService.unlock()).run(
156 () -> commitResult.set(CommitInfo.empty()),
157 MoreExecutors.directExecutor());
158 } else if (allWarnings(errors)) {
159 LOG.info("Commit successful with warnings {}", errors);
160 Futures.whenAllComplete(netconfService.unlock()).run(
161 () -> commitResult.set(CommitInfo.empty()),
162 MoreExecutors.directExecutor());
164 Futures.whenAllComplete(discardAndUnlock()).run(
165 () -> commitResult.setException(toCommitFailedException(errors)),
166 MoreExecutors.directExecutor());
171 public void onFailure(final Throwable throwable) {
172 Futures.whenAllComplete(discardAndUnlock()).run(
173 () -> commitResult.setException(throwable),
174 MoreExecutors.directExecutor());
176 }, MoreExecutors.directExecutor());
180 public void onFailure(final Throwable throwable) {
181 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
182 MoreExecutors.directExecutor());
184 }, MoreExecutors.directExecutor());
190 ListenableFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
191 return netconfService.getConfig(path);
194 private List<ListenableFuture<?>> discardAndUnlock() {
195 // execute discard & unlock operations only if lock operation was completed successfully
197 return List.of(netconfService.discardChanges(), netconfService.unlock());
203 private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
204 final ListenableFuture<? extends DOMRpcResult> operationFuture;
205 synchronized (resultsFutures) {
206 // if we only have result for the lock operation ...
207 if (resultsFutures.size() == 1) {
208 operationFuture = Futures.transformAsync(resultsFutures.get(0),
210 // ... then add new operation to the chain if lock was successful
211 if (result != null && (result.errors().isEmpty() || allWarnings(result.errors()))) {
212 return operation.get();
214 return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
215 ErrorType.APPLICATION, ErrorTag.LOCK_DENIED, ErrorSeverity.ERROR));
218 MoreExecutors.directExecutor());
220 // ... otherwise just add operation to the execution chain
221 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
222 future -> operation.get(),
223 MoreExecutors.directExecutor());
225 // ... finally save operation related future to the list
226 resultsFutures.add(operationFuture);
230 // Transform list of futures related to RPC operation into a single Future
231 private static ListenableFuture<DOMRpcResult> mergeFutures(
232 final List<ListenableFuture<? extends DOMRpcResult>> futures) {
233 return Futures.whenAllComplete(futures).call(() -> {
234 if (futures.size() == 1) {
236 return Futures.getDone(futures.get(0));
239 final var builder = ImmutableList.<RpcError>builder();
240 for (ListenableFuture<? extends DOMRpcResult> future : futures) {
241 builder.addAll(Futures.getDone(future).errors());
243 return new DefaultDOMRpcResult(null, builder.build());
244 }, MoreExecutors.directExecutor());
247 private static TransactionCommitFailedException toCommitFailedException(
248 final Collection<? extends RpcError> errors) {
249 ErrorType errType = ErrorType.APPLICATION;
250 ErrorSeverity errSeverity = ErrorSeverity.ERROR;
251 StringJoiner msgBuilder = new StringJoiner(" ");
252 ErrorTag errorTag = ErrorTag.OPERATION_FAILED;
253 for (final RpcError error : errors) {
254 errType = error.getErrorType();
255 errSeverity = error.getSeverity();
256 msgBuilder.add(error.getMessage());
257 msgBuilder.add(error.getInfo());
258 errorTag = error.getTag();
261 return new TransactionCommitFailedException("Netconf transaction commit failed",
262 new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType, errorTag,
266 private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
267 final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
268 Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
270 public void onSuccess(final DOMRpcResult rpcResult) {
271 if (rpcResult != null && !rpcResult.errors().isEmpty()) {
272 LOG.error("Errors occurred during processing of the RPC operation: {}",
273 rpcResult.errors().stream().map(Object::toString).collect(Collectors.joining(",")));
278 public void onFailure(final Throwable throwable) {
279 LOG.error("Error processing operation", throwable);
281 }, MoreExecutors.directExecutor());
284 private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
285 return errors.stream().allMatch(error -> error.getSeverity() == ErrorSeverity.WARNING);