BUG-8402: fix sequencing with read/exists requests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
index 09a8a605631c1f9d4b5cbe07e95a42a13077e85a..a70e6c5b6cb2f238e20239626de37fed6b0c56c9 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
@@ -239,7 +240,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
@@ -252,7 +253,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
     private ModifyTransactionRequest abortRequest() {
@@ -341,14 +342,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -465,14 +466,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -492,6 +493,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             enqueueAbort(callback, enqueuedTicks);
         } else if (request instanceof TransactionPurgeRequest) {
             enqueuePurge(enqueuedTicks);
+        } else if (request instanceof IncrementTransactionSequenceRequest) {
+            final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
+                req.getIncrement()), callback, enqueuedTicks);
+            incrementSequence(req.getIncrement());
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }