Fail read requests after we have observed a modification failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalProxyTransaction.java
index 64f5641fb7441bca49798af6d521ad2680620409..feddd5fc5a5763999cd4abf087ba8c6b54686f6a 100644 (file)
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
 abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
 
-    private final TransactionIdentifier identifier;
+    private final @NonNull TransactionIdentifier identifier;
 
     LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) {
         super(parent, isDone);
@@ -76,12 +76,12 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
 
     @Override
-    final FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
+    FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
         return FluentFutures.immediateBooleanFluentFuture(readOnlyView().readNode(path).isPresent());
     }
 
     @Override
-    final FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
+    FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
         return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path));
     }
 
@@ -100,30 +100,6 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
-    private boolean handleReadRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
-        // Note we delay completion of read requests to limit the scope at which the client can run, as they have
-        // listeners, which we do not want to execute while we are reconnecting.
-        if (request instanceof ReadTransactionRequest) {
-            if (callback != null) {
-                final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
-                final Optional<NormalizedNode> result = readOnlyView().readNode(path);
-                executeInActor(() -> callback.accept(new ReadTransactionSuccess(request.getTarget(),
-                    request.getSequence(), result)));
-            }
-            return true;
-        } else if (request instanceof ExistsTransactionRequest) {
-            if (callback != null) {
-                final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
-                final boolean result = readOnlyView().readNode(path).isPresent();
-                executeInActor(() -> callback.accept(new ExistsTransactionSuccess(request.getTarget(),
-                    request.getSequence(), result)));
-            }
-            return true;
-        } else {
-            return false;
-        }
-    }
-
     @Override
     void handleReplayedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
@@ -162,6 +138,38 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
+    @NonNull Response<?, ?> handleExistsRequest(final @NonNull DataTreeSnapshot snapshot,
+            final @NonNull ExistsTransactionRequest request) {
+        return new ExistsTransactionSuccess(request.getTarget(), request.getSequence(),
+            snapshot.readNode(request.getPath()).isPresent());
+    }
+
+    @NonNull Response<?, ?> handleReadRequest(final @NonNull DataTreeSnapshot snapshot,
+            final @NonNull ReadTransactionRequest request) {
+        return new ReadTransactionSuccess(request.getTarget(), request.getSequence(),
+            snapshot.readNode(request.getPath()));
+    }
+
+    private boolean handleReadRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        // Note we delay completion of read requests to limit the scope at which the client can run, as they have
+        // listeners, which we do not want to execute while we are reconnecting.
+        if (request instanceof ReadTransactionRequest) {
+            if (callback != null) {
+                final var response = handleReadRequest(readOnlyView(), (ReadTransactionRequest) request);
+                executeInActor(() -> callback.accept(response));
+            }
+            return true;
+        } else if (request instanceof ExistsTransactionRequest) {
+            if (callback != null) {
+                final var response = handleExistsRequest(readOnlyView(), (ExistsTransactionRequest) request);
+                executeInActor(() -> callback.accept(response));
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     @Override
     final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
                          final Consumer<Response<?, ?>> callback) {