Fail read requests after we have observed a modification failure 65/100765/7
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 24 Apr 2022 08:16:42 +0000 (10:16 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 24 Apr 2022 16:42:31 +0000 (18:42 +0200)
A local RW transaction can intercept a modification. If this happens, we
postpone the failure until commit time, but we must not allow further
reads to be satisfied, as they may end up returning incorrect data.

JIRA: CONTROLLER-2041
Change-Id: Ic1bb4c33b786e5926022125be8ee66aebfafee87
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java

index 64f5641fb7441bca49798af6d521ad2680620409..feddd5fc5a5763999cd4abf087ba8c6b54686f6a 100644 (file)
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
 abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
 
-    private final TransactionIdentifier identifier;
+    private final @NonNull TransactionIdentifier identifier;
 
     LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) {
         super(parent, isDone);
@@ -76,12 +76,12 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
 
     @Override
-    final FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
+    FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
         return FluentFutures.immediateBooleanFluentFuture(readOnlyView().readNode(path).isPresent());
     }
 
     @Override
-    final FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
+    FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
         return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path));
     }
 
@@ -100,30 +100,6 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
-    private boolean handleReadRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
-        // Note we delay completion of read requests to limit the scope at which the client can run, as they have
-        // listeners, which we do not want to execute while we are reconnecting.
-        if (request instanceof ReadTransactionRequest) {
-            if (callback != null) {
-                final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
-                final Optional<NormalizedNode> result = readOnlyView().readNode(path);
-                executeInActor(() -> callback.accept(new ReadTransactionSuccess(request.getTarget(),
-                    request.getSequence(), result)));
-            }
-            return true;
-        } else if (request instanceof ExistsTransactionRequest) {
-            if (callback != null) {
-                final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
-                final boolean result = readOnlyView().readNode(path).isPresent();
-                executeInActor(() -> callback.accept(new ExistsTransactionSuccess(request.getTarget(),
-                    request.getSequence(), result)));
-            }
-            return true;
-        } else {
-            return false;
-        }
-    }
-
     @Override
     void handleReplayedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
@@ -162,6 +138,38 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
+    @NonNull Response<?, ?> handleExistsRequest(final @NonNull DataTreeSnapshot snapshot,
+            final @NonNull ExistsTransactionRequest request) {
+        return new ExistsTransactionSuccess(request.getTarget(), request.getSequence(),
+            snapshot.readNode(request.getPath()).isPresent());
+    }
+
+    @NonNull Response<?, ?> handleReadRequest(final @NonNull DataTreeSnapshot snapshot,
+            final @NonNull ReadTransactionRequest request) {
+        return new ReadTransactionSuccess(request.getTarget(), request.getSequence(),
+            snapshot.readNode(request.getPath()));
+    }
+
+    private boolean handleReadRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        // Note we delay completion of read requests to limit the scope at which the client can run, as they have
+        // listeners, which we do not want to execute while we are reconnecting.
+        if (request instanceof ReadTransactionRequest) {
+            if (callback != null) {
+                final var response = handleReadRequest(readOnlyView(), (ReadTransactionRequest) request);
+                executeInActor(() -> callback.accept(response));
+            }
+            return true;
+        } else if (request instanceof ExistsTransactionRequest) {
+            if (callback != null) {
+                final var response = handleExistsRequest(readOnlyView(), (ExistsTransactionRequest) request);
+                executeInActor(() -> callback.accept(response));
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     @Override
     final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
                          final Consumer<Response<?, ?>> callback) {
index ecb914549dacac574c0a524016972ea823de4233..a8a38a8497cc5ebb5f0f5ce989a83f77ec2b26af 100644 (file)
@@ -11,6 +11,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 import static com.google.common.base.Verify.verifyNotNull;
 
+import com.google.common.util.concurrent.FluentFuture;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.function.BiConsumer;
@@ -21,9 +22,11 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
@@ -33,8 +36,11 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitR
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -86,7 +92,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     private Exception recordedFailure;
 
     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
-        final DataTreeSnapshot snapshot) {
+            final DataTreeSnapshot snapshot) {
         super(parent, identifier, false);
         modification = (CursorAwareDataTreeModification) snapshot.newModification();
     }
@@ -107,6 +113,20 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         return getModification();
     }
 
+    @Override
+    FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
+        final var ex = recordedFailure;
+        return ex == null ? super.doExists(path)
+            : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex));
+    }
+
+    @Override
+    FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
+        final var ex = recordedFailure;
+        return ex == null ? super.doRead(path)
+            : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex));
+    }
+
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     void doDelete(final YangInstanceIdentifier path) {
@@ -323,6 +343,22 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         }
     }
 
+    @Override
+    Response<?, ?> handleExistsRequest(final DataTreeSnapshot snapshot, final ExistsTransactionRequest request) {
+        final var ex = recordedFailure;
+        return ex == null ? super.handleExistsRequest(snapshot, request)
+            : request.toRequestFailure(
+                new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex)));
+    }
+
+    @Override
+    Response<?, ?> handleReadRequest(final DataTreeSnapshot snapshot, final ReadTransactionRequest request) {
+        final var ex = recordedFailure;
+        return ex == null ? super.handleReadRequest(snapshot, request)
+            : request.toRequestFailure(
+                new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex)));
+    }
+
     @Override
     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
             final Consumer<Response<?, ?>> callback) {