X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=restconf%2Frestconf-nb-rfc8040%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Fnb%2Frfc8040%2Frests%2Ftransactions%2FNetconfRestconfTransaction.java;h=de7c9c02a916bdca84cc2dbd885ca21abe1713e5;hb=cea637c7abbb8d132d8df30bc739ce041e0add8b;hp=55e9c93d3e611d0a7e99645540c99187b174fd07;hpb=fbedf3e1c6ede6bf9b0348f85a54e5b5f53abf1b;p=netconf.git diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfTransaction.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfTransaction.java index 55e9c93d3e..de7c9c02a9 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfTransaction.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfTransaction.java @@ -8,17 +8,34 @@ package org.opendaylight.restconf.nb.rfc8040.rests.transactions; import static java.util.Objects.requireNonNull; +import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.StringJoiner; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.common.api.CommitInfo; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.netconf.api.DocumentedException; +import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.dom.api.NetconfDataTreeService; +import org.opendaylight.yangtools.yang.common.ErrorSeverity; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; @@ -26,76 +43,264 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class NetconfRestconfTransaction extends RestconfTransaction { - private final NetconfDataTreeService netconfService; - private List> resultsFutures; + private static final Logger LOG = LoggerFactory.getLogger(NetconfRestconfTransaction.class); + + private final NetconfDataTreeService netconfService; + private final List> resultsFutures = + Collections.synchronizedList(new ArrayList<>()); + private volatile boolean isLocked = false; NetconfRestconfTransaction(final NetconfDataTreeService netconfService) { this.netconfService = requireNonNull(netconfService); - this.resultsFutures = new ArrayList<>(netconfService.lock()); + final ListenableFuture lockResult = netconfService.lock(); + Futures.addCallback(lockResult, lockOperationCallback, MoreExecutors.directExecutor()); + resultsFutures.add(lockResult); } @Override public void cancel() { - resultsFutures = null; - netconfService.discardChanges(); - netconfService.unlock(); + resultsFutures.clear(); + executeWithLogging(netconfService::discardChanges); + executeWithLogging(netconfService::unlock); } @Override - public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { - resultsFutures.add(netconfService.delete(store, path)); + public void delete(final YangInstanceIdentifier path) { + enqueueOperation(() -> netconfService.delete(CONFIGURATION, path)); } @Override - public void remove(final LogicalDatastoreType store, final YangInstanceIdentifier path) { - resultsFutures.add(netconfService.remove(store, path)); + public void remove(final YangInstanceIdentifier path) { + enqueueOperation(() -> netconfService.remove(CONFIGURATION, path)); } @Override - public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, - final NormalizedNode data) { - resultsFutures.add(netconfService.merge(store, path, data, Optional.empty())); + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + enqueueOperation(() -> netconfService.merge(CONFIGURATION, path, data, Optional.empty())); } @Override - public void create(final LogicalDatastoreType store, final YangInstanceIdentifier path, - final NormalizedNode data, final SchemaContext schemaContext) { + public void create(final YangInstanceIdentifier path, final NormalizedNode data, + final SchemaContext schemaContext) { if (data instanceof MapNode || data instanceof LeafSetNode) { - final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path); - merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()), - emptySubTree); + final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path); + merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree); - for (final NormalizedNode child : ((NormalizedNodeContainer) data).getValue()) { + for (final NormalizedNode child : ((NormalizedNodeContainer) data).body()) { final YangInstanceIdentifier childPath = path.node(child.getIdentifier()); - resultsFutures.add(netconfService.create(store, childPath, child, Optional.empty())); + enqueueOperation(() -> netconfService.create(CONFIGURATION, childPath, child, Optional.empty())); } } else { - resultsFutures.add(netconfService.create(store, path, data, Optional.empty())); + enqueueOperation(() -> netconfService.create(CONFIGURATION, path, data, Optional.empty())); } } @Override - public void replace(final LogicalDatastoreType store, final YangInstanceIdentifier path, - final NormalizedNode data, final SchemaContext schemaContext) { + public void replace(final YangInstanceIdentifier path, final NormalizedNode data, + final SchemaContext schemaContext) { if (data instanceof MapNode || data instanceof LeafSetNode) { - final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path); - merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(emptySubTree.getIdentifier()), - emptySubTree); + final NormalizedNode emptySubTree = ImmutableNodes.fromInstanceId(schemaContext, path); + merge(YangInstanceIdentifier.create(emptySubTree.getIdentifier()), emptySubTree); - for (final NormalizedNode child : ((NormalizedNodeContainer) data).getValue()) { + for (final NormalizedNode child : ((NormalizedNodeContainer) data).body()) { final YangInstanceIdentifier childPath = path.node(child.getIdentifier()); - resultsFutures.add(netconfService.replace(store, childPath, child, Optional.empty())); + enqueueOperation(() -> netconfService.replace(CONFIGURATION, childPath, child, Optional.empty())); } } else { - resultsFutures.add(netconfService.replace(store, path, data, Optional.empty())); + enqueueOperation(() -> netconfService.replace(CONFIGURATION, path, data, Optional.empty())); } } @Override public FluentFuture commit() { - return FluentFuture.from(netconfService.commit(resultsFutures)); + final SettableFuture commitResult = SettableFuture.create(); + + // First complete all resultsFutures and merge them ... + final ListenableFuture resultErrors = mergeFutures(resultsFutures); + + // ... then evaluate if there are any problems + Futures.addCallback(resultErrors, new FutureCallback<>() { + @Override + public void onSuccess(final DOMRpcResult result) { + final Collection errors = result.getErrors(); + if (!allWarnings(errors)) { + Futures.whenAllComplete(discardAndUnlock()).run( + () -> commitResult.setException(toCommitFailedException(errors)), + MoreExecutors.directExecutor()); + return; + } + + // ... no problems so far, initiate commit + Futures.addCallback(netconfService.commit(), new FutureCallback() { + @Override + public void onSuccess(final DOMRpcResult rpcResult) { + final Collection errors = result.getErrors(); + if (errors.isEmpty()) { + Futures.whenAllComplete(netconfService.unlock()).run( + () -> commitResult.set(CommitInfo.empty()), + MoreExecutors.directExecutor()); + } else if (allWarnings(errors)) { + LOG.info("Commit successful with warnings {}", errors); + Futures.whenAllComplete(netconfService.unlock()).run( + () -> commitResult.set(CommitInfo.empty()), + MoreExecutors.directExecutor()); + } else { + Futures.whenAllComplete(discardAndUnlock()).run( + () -> commitResult.setException(toCommitFailedException(errors)), + MoreExecutors.directExecutor()); + } + } + + @Override + public void onFailure(final Throwable throwable) { + Futures.whenAllComplete(discardAndUnlock()).run( + () -> commitResult.setException(throwable), + MoreExecutors.directExecutor()); + } + }, MoreExecutors.directExecutor()); + } + + @Override + public void onFailure(final Throwable throwable) { + Futures.whenAllComplete(discardAndUnlock()).run(() -> commitResult.setException(throwable), + MoreExecutors.directExecutor()); + } + }, MoreExecutors.directExecutor()); + + return FluentFuture.from(commitResult); + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private List> discardAndUnlock() { + // execute discard & unlock operations only if lock operation was completed successfully + if (isLocked) { + return List.of(netconfService.discardChanges(), netconfService.unlock()); + } else { + return Collections.emptyList(); + } + } + + private final FutureCallback lockOperationCallback = new FutureCallback<>() { + @Override + public void onSuccess(final DOMRpcResult rpcResult) { + if (rpcResult != null && allWarnings(rpcResult.getErrors())) { + isLocked = true; + } + } + + @Override + public void onFailure(final Throwable throwable) { + // do nothing + } + }; + + private void enqueueOperation(final Supplier> operation) { + final ListenableFuture operationFuture; + synchronized (resultsFutures) { + // if we only have result for the lock operation ... + if (resultsFutures.size() == 1) { + operationFuture = Futures.transformAsync(resultsFutures.get(0), + result -> { + // ... then add new operation to the chain if lock was successful + if (result != null && (result.getErrors().isEmpty() || allWarnings(result.getErrors()))) { + return operation.get(); + } else { + return Futures.immediateFailedFuture(new NetconfDocumentedException("Lock operation failed", + DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.LOCK_DENIED, + ErrorSeverity.ERROR)); + } + }, + MoreExecutors.directExecutor()); + } else { + // ... otherwise just add operation to the execution chain + operationFuture = Futures.transformAsync(resultsFutures.get(resultsFutures.size() - 1), + future -> operation.get(), + MoreExecutors.directExecutor()); + } + // ... finally save operation related future to the list + resultsFutures.add(operationFuture); + } + } + + // Transform list of futures related to RPC operation into a single Future + private static ListenableFuture mergeFutures( + final List> futures) { + return Futures.whenAllComplete(futures).call(() -> { + if (futures.size() == 1) { + // Fast path + return Futures.getDone(futures.get(0)); + } + + final var builder = ImmutableList.builder(); + for (ListenableFuture future : futures) { + builder.addAll(Futures.getDone(future).getErrors()); + } + return new DefaultDOMRpcResult(null, builder.build()); + }, MoreExecutors.directExecutor()); + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private static TransactionCommitFailedException toCommitFailedException( + final Collection errors) { + DocumentedException.ErrorType errType = DocumentedException.ErrorType.APPLICATION; + ErrorSeverity errSeverity = ErrorSeverity.ERROR; + StringJoiner msgBuilder = new StringJoiner(" "); + String errorTag = "operation-failed"; + for (final RpcError error : errors) { + switch (error.getErrorType()) { + case RPC: + errType = DocumentedException.ErrorType.RPC; + break; + case PROTOCOL: + errType = DocumentedException.ErrorType.PROTOCOL; + break; + case TRANSPORT: + errType = DocumentedException.ErrorType.TRANSPORT; + break; + case APPLICATION: + default: + errType = DocumentedException.ErrorType.APPLICATION; + break; + } + errSeverity = error.getSeverity().toNetconf(); + msgBuilder.add(error.getMessage()); + msgBuilder.add(error.getInfo()); + errorTag = error.getTag(); + } + + return new TransactionCommitFailedException("Netconf transaction commit failed", + new NetconfDocumentedException("RPC during tx failed. " + msgBuilder.toString(), errType, + DocumentedException.ErrorTag.from(errorTag), errSeverity)); + } + + private static void executeWithLogging(final Supplier> operation) { + final ListenableFuture operationResult = operation.get(); + Futures.addCallback(operationResult, new FutureCallback() { + @Override + public void onSuccess(final DOMRpcResult rpcResult) { + if (rpcResult != null && !rpcResult.getErrors().isEmpty()) { + LOG.error("Errors occurred during processing of the RPC operation: {}", + rpcResult.getErrors().stream().map(Object::toString).collect(Collectors.joining(","))); + } + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.error("Error processing operation", throwable); + } + }, MoreExecutors.directExecutor()); + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private static boolean allWarnings(final Collection errors) { + return errors.stream().allMatch(error -> error.getSeverity() == RpcError.ErrorSeverity.WARNING); } }