2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.transactions;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
12 import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.fromInstanceId;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.Optional;
26 import java.util.StringJoiner;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.function.Supplier;
29 import java.util.stream.Collectors;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.opendaylight.mdsal.common.api.CommitInfo;
33 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
34 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
35 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
36 import org.opendaylight.netconf.api.NetconfDocumentedException;
37 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
38 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
39 import org.opendaylight.yangtools.yang.common.ErrorTag;
40 import org.opendaylight.yangtools.yang.common.ErrorType;
41 import org.opendaylight.yangtools.yang.common.RpcError;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
44 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
48 import org.opendaylight.yangtools.yang.data.util.DataSchemaContext;
49 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
50 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
51 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
52 import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
53 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 final class NetconfRestconfTransaction extends RestconfTransaction {
58 private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class);
60 private final List<ListenableFuture<? extends DOMRpcResult>> resultsFutures =
61 Collections.synchronizedList(new ArrayList<>());
62 private final NetconfDataTreeService netconfService;
63 private final Map<YangInstanceIdentifier, Collection<? extends NormalizedNode>> readListCache =
64 new ConcurrentHashMap<>();
66 private volatile boolean isLocked = false;
68 NetconfRestconfTransaction(final EffectiveModelContext modelContext, final NetconfDataTreeService netconfService) {
70 this.netconfService = requireNonNull(netconfService);
72 final var lockResult = netconfService.lock();
73 Futures.addCallback(lockResult, new FutureCallback<DOMRpcResult>() {
75 public void onSuccess(final DOMRpcResult rpcResult) {
76 if (rpcResult != null && allWarnings(rpcResult.errors())) {
82 public void onFailure(final Throwable throwable) {
85 }, MoreExecutors.directExecutor());
86 resultsFutures.add(lockResult);
91 resultsFutures.clear();
92 readListCache.clear();
93 executeWithLogging(netconfService::discardChanges);
94 executeWithLogging(netconfService::unlock);
98 void deleteImpl(final YangInstanceIdentifier path) {
99 if (isListPath(path, modelContext)) {
100 final var items = getListItemsForRemove(path);
101 if (items.isEmpty()) {
102 LOG.debug("Path {} contains no items, delete operation omitted.", path);
104 items.forEach(item ->
105 enqueueOperation(() -> netconfService.delete(CONFIGURATION, path.node(item.name()))));
108 enqueueOperation(() -> netconfService.delete(CONFIGURATION, path));
113 void removeImpl(final YangInstanceIdentifier path) {
114 if (isListPath(path, modelContext)) {
115 final var items = getListItemsForRemove(path);
116 if (items.isEmpty()) {
117 LOG.debug("Path {} contains no items, remove operation omitted.", path);
119 items.forEach(item ->
120 enqueueOperation(() -> netconfService.remove(CONFIGURATION, path.node(item.name()))));
123 enqueueOperation(() -> netconfService.remove(CONFIGURATION, path));
128 @Nullable NormalizedNodeContainer<?> readList(final YangInstanceIdentifier path) {
129 // reading list is mainly invoked for subsequent removal,
130 // cache data to avoid extra read invocation on delete/remove
131 final var result = TransactionUtil.syncAccess(read(path), path);
132 readListCache.put(path, result.map(data -> ((NormalizedNodeContainer<?>) data).body()).orElse(List.of()));
133 return (NormalizedNodeContainer<?>) result.orElse(null);
136 private @NonNull Collection<? extends NormalizedNode> getListItemsForRemove(final YangInstanceIdentifier path) {
137 final var cached = readListCache.remove(path);
138 if (cached != null) {
141 // check if keys only can be filtered out to minimize amount of data retrieved
142 final var keyFields = keyFieldsFrom(path, modelContext);
143 final var future = keyFields.isEmpty() ? netconfService.getConfig(path)
144 // using list wildcard as a root path, it's required for proper key field path construction
145 // on building get-config filter
146 : netconfService.getConfig(
147 path.node(NodeIdentifierWithPredicates.of(path.getLastPathArgument().getNodeType())), keyFields);
148 final var retrieved = TransactionUtil.syncAccess(future, path);
149 return retrieved.map(data -> ((NormalizedNodeContainer<?>) data).body()).orElse(List.of());
153 void mergeImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
154 enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty()));
158 void createImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
159 if (data instanceof MapNode || data instanceof LeafSetNode) {
160 final var emptySubTree = fromInstanceId(modelContext, path);
161 merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
163 for (var child : ((NormalizedNodeContainer<?>) data).body()) {
164 final var childPath = path.node(child.name());
165 enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty()));
168 enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty()));
173 void replaceImpl(final YangInstanceIdentifier path, final NormalizedNode data) {
174 if (data instanceof MapNode || data instanceof LeafSetNode) {
175 final var emptySubTree = fromInstanceId(modelContext, path);
176 merge(YangInstanceIdentifier.of(emptySubTree.name()), emptySubTree);
178 for (var child : ((NormalizedNodeContainer<?>) data).body()) {
179 final var childPath = path.node(child.name());
180 enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty()));
183 enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty()));
188 ListenableFuture<? extends @NonNull CommitInfo> commit() {
189 final SettableFuture<CommitInfo> commitResult = SettableFuture.create();
191 // First complete all resultsFutures and merge them ...
192 final ListenableFuture<DOMRpcResult> resultErrors = mergeFutures(resultsFutures);
194 // ... then evaluate if there are any problems
195 Futures.addCallback(resultErrors, new FutureCallback<>() {
197 public void onSuccess(final DOMRpcResult result) {
198 final Collection<? extends RpcError> errors = result.errors();
199 if (!allWarnings(errors)) {
200 Futures.whenAllComplete(discardAndUnlock()).run(
201 () -> commitResult.setException(toCommitFailedException(errors)),
202 MoreExecutors.directExecutor());
206 // ... no problems so far, initiate commit
207 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
209 public void onSuccess(final DOMRpcResult rpcResult) {
210 final Collection<? extends RpcError> errors = rpcResult.errors();
211 if (errors.isEmpty()) {
212 Futures.whenAllComplete(netconfService.unlock()).run(
213 () -> commitResult.set(CommitInfo.empty()),
214 MoreExecutors.directExecutor());
215 } else if (allWarnings(errors)) {
216 LOG.info("Commit successful with warnings {}", errors);
217 Futures.whenAllComplete(netconfService.unlock()).run(
218 () -> commitResult.set(CommitInfo.empty()),
219 MoreExecutors.directExecutor());
221 Futures.whenAllComplete(discardAndUnlock()).run(
222 () -> commitResult.setException(toCommitFailedException(errors)),
223 MoreExecutors.directExecutor());
228 public void onFailure(final Throwable throwable) {
229 Futures.whenAllComplete(discardAndUnlock()).run(
230 () -> commitResult.setException(throwable),
231 MoreExecutors.directExecutor());
233 }, MoreExecutors.directExecutor());
237 public void onFailure(final Throwable throwable) {
238 Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable),
239 MoreExecutors.directExecutor());
241 }, MoreExecutors.directExecutor());
247 ListenableFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
248 return netconfService.getConfig(path);
251 private List<ListenableFuture<?>> discardAndUnlock() {
252 readListCache.clear();
253 // execute discard & unlock operations only if lock operation was completed successfully
255 return List.of(netconfService.discardChanges(), netconfService.unlock());
261 private void enqueueOperation(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
262 final ListenableFuture<? extends DOMRpcResult> operationFuture;
263 synchronized (resultsFutures) {
264 // if we only have result for the lock operation ...
265 if (resultsFutures.size() == 1) {
266 operationFuture = Futures.transformAsync(resultsFutures.get(0),
268 // ... then add new operation to the chain if lock was successful
269 if (result != null && (result.errors().isEmpty() || allWarnings(result.errors()))) {
270 return operation.get();
272 return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed",
273 ErrorType.APPLICATION, ErrorTag.LOCK_DENIED, ErrorSeverity.ERROR));
276 MoreExecutors.directExecutor());
278 // ... otherwise just add operation to the execution chain
279 operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1),
280 future -> operation.get(),
281 MoreExecutors.directExecutor());
283 // ... finally save operation related future to the list
284 resultsFutures.add(operationFuture);
288 // Transform list of futures related to RPC operation into a single Future
289 private static ListenableFuture<DOMRpcResult> mergeFutures(
290 final List<ListenableFuture<? extends DOMRpcResult>> futures) {
291 return Futures.whenAllComplete(futures).call(() -> {
292 if (futures.size() == 1) {
294 return Futures.getDone(futures.get(0));
297 final var builder = ImmutableList.<RpcError>builder();
298 for (ListenableFuture<? extends DOMRpcResult> future : futures) {
299 builder.addAll(Futures.getDone(future).errors());
301 return new DefaultDOMRpcResult(null, builder.build());
302 }, MoreExecutors.directExecutor());
305 private static TransactionCommitFailedException toCommitFailedException(
306 final Collection<? extends RpcError> errors) {
307 ErrorType errType = ErrorType.APPLICATION;
308 ErrorSeverity errSeverity = ErrorSeverity.ERROR;
309 StringJoiner msgBuilder = new StringJoiner(" ");
310 ErrorTag errorTag = ErrorTag.OPERATION_FAILED;
311 for (final RpcError error : errors) {
312 errType = error.getErrorType();
313 errSeverity = error.getSeverity();
314 msgBuilder.add(error.getMessage());
315 msgBuilder.add(error.getInfo());
316 errorTag = error.getTag();
319 return new TransactionCommitFailedException("Netconf transaction commit failed",
320 new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType, errorTag,
324 private static void executeWithLogging(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation) {
325 final ListenableFuture<? extends DOMRpcResult> operationResult = operation.get();
326 Futures.addCallback(operationResult, new FutureCallback<DOMRpcResult>() {
328 public void onSuccess(final DOMRpcResult rpcResult) {
329 if (rpcResult != null && !rpcResult.errors().isEmpty()) {
330 LOG.error("Errors occurred during processing of the RPC operation: {}",
331 rpcResult.errors().stream().map(Object::toString).collect(Collectors.joining(",")));
336 public void onFailure(final Throwable throwable) {
337 LOG.error("Error processing operation", throwable);
339 }, MoreExecutors.directExecutor());
342 private static boolean allWarnings(final Collection<? extends @NonNull RpcError> errors) {
343 return errors.stream().allMatch(error -> error.getSeverity() == ErrorSeverity.WARNING);
346 private static boolean isListPath(final YangInstanceIdentifier path, final EffectiveModelContext modelContext) {
347 if (path.getLastPathArgument() instanceof YangInstanceIdentifier.NodeIdentifier) {
348 // list can be referenced by NodeIdentifier only, prevent list item do be identified as list
349 final var schemaNode = schemaNodeFrom(path, modelContext);
350 return schemaNode instanceof ListSchemaNode || schemaNode instanceof LeafListSchemaNode;
355 private static List<YangInstanceIdentifier> keyFieldsFrom(final YangInstanceIdentifier path,
356 final EffectiveModelContext modelContext) {
357 final var schemaNode = schemaNodeFrom(path, modelContext);
358 return schemaNode instanceof ListSchemaNode listSchemaNode
359 ? listSchemaNode.getKeyDefinition().stream().map(YangInstanceIdentifier::of).toList() : List.of();
362 private static DataSchemaNode schemaNodeFrom(final YangInstanceIdentifier path,
363 final EffectiveModelContext modelContext) {
364 return DataSchemaContextTree.from(modelContext).findChild(path)
365 .map(DataSchemaContext::dataSchemaNode).orElse(null);