From 3ce75d79be6b4b7d0f703505f791f71131c9cc48 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 24 Apr 2022 10:16:42 +0200 Subject: [PATCH] Fail read requests after we have observed a modification failure A local RW transaction can intercept a modification. If this happens, we postpone the failure until commit time, but we must not allow further reads to be satisfied, as they may end up returning incorrect data. JIRA: CONTROLLER-2041 Change-Id: Ic1bb4c33b786e5926022125be8ee66aebfafee87 Signed-off-by: Robert Varga --- .../actors/dds/LocalProxyTransaction.java | 62 +++++++++++-------- .../dds/LocalReadWriteProxyTransaction.java | 38 +++++++++++- 2 files changed, 72 insertions(+), 28 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 64f5641fb7..feddd5fc5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory; abstract class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); - private final TransactionIdentifier identifier; + private final @NonNull TransactionIdentifier identifier; LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) { super(parent, isDone); @@ -76,12 +76,12 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { @Nullable Consumer> callback, long enqueuedTicks); @Override - final FluentFuture doExists(final YangInstanceIdentifier path) { + FluentFuture doExists(final YangInstanceIdentifier path) { return FluentFutures.immediateBooleanFluentFuture(readOnlyView().readNode(path).isPresent()); } @Override - final FluentFuture> doRead(final YangInstanceIdentifier path) { + FluentFuture> doRead(final YangInstanceIdentifier path) { return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path)); } @@ -100,30 +100,6 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } } - private boolean handleReadRequest(final TransactionRequest request, final Consumer> callback) { - // Note we delay completion of read requests to limit the scope at which the client can run, as they have - // listeners, which we do not want to execute while we are reconnecting. - if (request instanceof ReadTransactionRequest) { - if (callback != null) { - final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); - final Optional result = readOnlyView().readNode(path); - executeInActor(() -> callback.accept(new ReadTransactionSuccess(request.getTarget(), - request.getSequence(), result))); - } - return true; - } else if (request instanceof ExistsTransactionRequest) { - if (callback != null) { - final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); - final boolean result = readOnlyView().readNode(path).isPresent(); - executeInActor(() -> callback.accept(new ExistsTransactionSuccess(request.getTarget(), - request.getSequence(), result))); - } - return true; - } else { - return false; - } - } - @Override void handleReplayedRemoteRequest(final TransactionRequest request, final Consumer> callback, final long enqueuedTicks) { @@ -162,6 +138,38 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } } + @NonNull Response handleExistsRequest(final @NonNull DataTreeSnapshot snapshot, + final @NonNull ExistsTransactionRequest request) { + return new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), + snapshot.readNode(request.getPath()).isPresent()); + } + + @NonNull Response handleReadRequest(final @NonNull DataTreeSnapshot snapshot, + final @NonNull ReadTransactionRequest request) { + return new ReadTransactionSuccess(request.getTarget(), request.getSequence(), + snapshot.readNode(request.getPath())); + } + + private boolean handleReadRequest(final TransactionRequest request, final Consumer> callback) { + // Note we delay completion of read requests to limit the scope at which the client can run, as they have + // listeners, which we do not want to execute while we are reconnecting. + if (request instanceof ReadTransactionRequest) { + if (callback != null) { + final var response = handleReadRequest(readOnlyView(), (ReadTransactionRequest) request); + executeInActor(() -> callback.accept(response)); + } + return true; + } else if (request instanceof ExistsTransactionRequest) { + if (callback != null) { + final var response = handleExistsRequest(readOnlyView(), (ExistsTransactionRequest) request); + executeInActor(() -> callback.accept(response)); + } + return true; + } else { + return false; + } + } + @Override final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, final Consumer> callback) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index ecb914549d..a8a38a8497 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -11,6 +11,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; +import com.google.common.util.concurrent.FluentFuture; import java.util.Optional; import java.util.OptionalLong; import java.util.function.BiConsumer; @@ -21,9 +22,11 @@ import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; @@ -33,8 +36,11 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitR import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -86,7 +92,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { private Exception recordedFailure; LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final DataTreeSnapshot snapshot) { + final DataTreeSnapshot snapshot) { super(parent, identifier, false); modification = (CursorAwareDataTreeModification) snapshot.newModification(); } @@ -107,6 +113,20 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { return getModification(); } + @Override + FluentFuture doExists(final YangInstanceIdentifier path) { + final var ex = recordedFailure; + return ex == null ? super.doExists(path) + : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex)); + } + + @Override + FluentFuture> doRead(final YangInstanceIdentifier path) { + final var ex = recordedFailure; + return ex == null ? super.doRead(path) + : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex)); + } + @Override @SuppressWarnings("checkstyle:IllegalCatch") void doDelete(final YangInstanceIdentifier path) { @@ -323,6 +343,22 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } } + @Override + Response handleExistsRequest(final DataTreeSnapshot snapshot, final ExistsTransactionRequest request) { + final var ex = recordedFailure; + return ex == null ? super.handleExistsRequest(snapshot, request) + : request.toRequestFailure( + new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex))); + } + + @Override + Response handleReadRequest(final DataTreeSnapshot snapshot, final ReadTransactionRequest request) { + final var ex = recordedFailure; + return ex == null ? super.handleReadRequest(snapshot, request) + : request.toRequestFailure( + new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex))); + } + @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, final Consumer> callback) { -- 2.36.6