X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalReadWriteProxyTransaction.java;h=47ae6a2bc7a27ae60fc1a8b094411c65c03d4a21;hb=HEAD;hp=c32297f0a4a9c60cb2a5a5e97d4ba2545ec95c2e;hpb=62cddd88e42e8f3c6a92bbf42c97b0d6806f44ae;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index c32297f0a4..47ae6a2bc7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -7,9 +7,14 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; + +import com.google.common.util.concurrent.FluentFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Optional; +import java.util.OptionalLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; @@ -18,9 +23,11 @@ import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; @@ -30,16 +37,19 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitR import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.tree.api.CursorAwareDataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.CursorAwareDataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModificationCursor; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,16 +92,31 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { */ private Exception recordedFailure; + @SuppressWarnings("checkstyle:IllegalCatch") LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final DataTreeSnapshot snapshot) { + final DataTreeSnapshot snapshot) { super(parent, identifier, false); - this.modification = (CursorAwareDataTreeModification) snapshot.newModification(); + + if (snapshot instanceof FailedDataTreeModification failed) { + recordedFailure = failed.cause(); + modification = failed; + } else { + CursorAwareDataTreeModification mod; + try { + mod = (CursorAwareDataTreeModification) snapshot.newModification(); + } catch (Exception e) { + LOG.debug("Failed to instantiate modification for {}", identifier, e); + recordedFailure = e; + mod = new FailedDataTreeModification(snapshot.modelContext(), e); + } + modification = mod; + } } LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { super(parent, identifier, true); // This is DONE transaction, this should never be touched - this.modification = null; + modification = null; } @Override @@ -104,6 +129,20 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { return getModification(); } + @Override + FluentFuture doExists(final YangInstanceIdentifier path) { + final var ex = recordedFailure; + return ex == null ? super.doExists(path) + : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex)); + } + + @Override + FluentFuture> doRead(final YangInstanceIdentifier path) { + final var ex = recordedFailure; + return ex == null ? super.doRead(path) + : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex)); + } + @Override @SuppressWarnings("checkstyle:IllegalCatch") void doDelete(final YangInstanceIdentifier path) { @@ -124,7 +163,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { @Override @SuppressWarnings("checkstyle:IllegalCatch") - void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { + void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { final CursorAwareDataTreeModification mod = getModification(); if (recordedFailure != null) { LOG.debug("Transaction {} recorded failure, ignoring merge to {}", getIdentifier(), path); @@ -142,7 +181,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { @Override @SuppressWarnings("checkstyle:IllegalCatch") - void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { + void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { final CursorAwareDataTreeModification mod = getModification(); if (recordedFailure != null) { LOG.debug("Transaction {} recorded failure, ignoring write to {}", getIdentifier(), path); @@ -176,7 +215,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } private void sealModification() { - Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this); + checkState(sealedModification == null, "Transaction %s is already sealed", this); final CursorAwareDataTreeModification mod = getModification(); mod.ready(); sealedModification = mod; @@ -189,7 +228,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } @Override - boolean sealAndSend(final Optional enqueuedTicks) { + boolean sealAndSend(final OptionalLong enqueuedTicks) { sealModification(); return super.sealAndSend(enqueuedTicks); } @@ -201,12 +240,12 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { @Override - public void write(final PathArgument child, final NormalizedNode data) { + public void write(final PathArgument child, final NormalizedNode data) { b.addModification(new TransactionWrite(current().node(child), data)); } @Override - public void merge(final PathArgument child, final NormalizedNode data) { + public void merge(final PathArgument child, final NormalizedNode data) { b.addModification(new TransactionMerge(current().node(child), data)); } @@ -219,8 +258,8 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { return Optional.of(b.build()); } - DataTreeSnapshot getSnapshot() { - Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier()); + CursorAwareDataTreeSnapshot getSnapshot() { + checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier()); return sealedModification; } @@ -253,26 +292,26 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { final Optional maybeProtocol = request.getPersistenceProtocol(); if (maybeProtocol.isPresent()) { - Verify.verify(callback != null, "Request %s has null callback", request); + final var cb = verifyNotNull(callback, "Request %s has null callback", request); if (markSealed()) { sealOnly(); } - switch (maybeProtocol.get()) { + switch (maybeProtocol.orElseThrow()) { case ABORT: - sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback); + sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), cb); break; case READY: // No-op, as we have already issued a sealOnly() and we are not transmitting anything break; case SIMPLE: - sendMethod.accept(commitRequest(false), callback); + sendMethod.accept(commitRequest(false), cb); break; case THREE_PHASE: - sendMethod.accept(commitRequest(true), callback); + sendMethod.accept(commitRequest(true), cb); break; default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); + throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.orElseThrow()); } } } @@ -320,16 +359,39 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } } + @Override + Response handleExistsRequest(final DataTreeSnapshot snapshot, final ExistsTransactionRequest request) { + final var ex = recordedFailure; + return ex == null ? super.handleExistsRequest(snapshot, request) + : request.toRequestFailure( + new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex))); + } + + @Override + Response handleReadRequest(final DataTreeSnapshot snapshot, final ReadTransactionRequest request) { + final var ex = recordedFailure; + return ex == null ? super.handleReadRequest(snapshot, request) + : request.toRequestFailure( + new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex))); + } + @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, final Consumer> callback) { if (request instanceof CommitLocalTransactionRequest) { - Verify.verify(successor instanceof LocalReadWriteProxyTransaction); - ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback); - LOG.debug("Forwarded request {} to successor {}", request, successor); + verifyLocalReadWrite(successor).sendRebased((CommitLocalTransactionRequest)request, callback); + } else if (request instanceof ModifyTransactionRequest) { + verifyLocalReadWrite(successor).handleForwardedRemoteRequest(request, callback); } else { super.forwardToLocal(successor, request, callback); + return; } + LOG.debug("Forwarded request {} to successor {}", request, successor); + } + + private static LocalReadWriteProxyTransaction verifyLocalReadWrite(final LocalProxyTransaction successor) { + verify(successor instanceof LocalReadWriteProxyTransaction, "Unexpected successor %s", successor); + return (LocalReadWriteProxyTransaction) successor; } @Override @@ -345,12 +407,12 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { closedException = this::abortedException; } + @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Replay of recorded failure") private @NonNull CursorAwareDataTreeModification getModification() { if (closedException != null) { throw closedException.get(); } - - return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier()); + return verifyNotNull(modification, "Transaction %s is DONE", getIdentifier()); } private void sendRebased(final CommitLocalTransactionRequest request, final Consumer> callback) { @@ -361,8 +423,18 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { // Rebase old modification on new data tree. final CursorAwareDataTreeModification mod = getModification(); - try (DataTreeModificationCursor cursor = mod.openCursor()) { - request.getModification().applyToCursor(cursor); + if (!(mod instanceof FailedDataTreeModification)) { + request.getDelayedFailure().ifPresentOrElse(failure -> { + if (recordedFailure == null) { + recordedFailure = failure; + } else { + recordedFailure.addSuppressed(failure); + } + }, () -> { + try (DataTreeModificationCursor cursor = mod.openCursor()) { + request.getModification().applyToCursor(cursor); + } + }); } if (markSealed()) {