BUG-8402: fix sequencing with read/exists requests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
index 46af74a5630999e0c75f430955875754c274dcb5..8e4c757d33767877bb2065a3de45ab201d1bfaf7 100644 (file)
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -70,12 +71,21 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     @NotThreadSafe
     private static final class IncrementSequence {
-        private long delta = 1;
+        private final long sequence;
+        private long delta = 0;
+
+        IncrementSequence(final long sequence) {
+            this.sequence = sequence;
+        }
 
         long getDelta() {
             return delta;
         }
 
+        long getSequence() {
+            return sequence;
+        }
+
         void incrementDelta() {
             delta++;
         }
@@ -191,7 +201,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return parent.localActor();
     }
 
-    private void incrementSequence(final long delta) {
+    final void incrementSequence(final long delta) {
         sequence += delta;
         LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
     }
@@ -298,12 +308,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         successfulRequests.add(Verify.verifyNotNull(req));
     }
 
-    final void recordFinishedRequest() {
+    final void recordFinishedRequest(final Response<?, ?> response) {
         final Object last = successfulRequests.peekLast();
         if (last instanceof IncrementSequence) {
             ((IncrementSequence) last).incrementDelta();
         } else {
-            successfulRequests.addLast(new IncrementSequence());
+            successfulRequests.addLast(new IncrementSequence(response.getSequence()));
         }
     }
 
@@ -525,7 +535,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
                 } else {
                     Verify.verify(obj instanceof IncrementSequence);
-                    successor.incrementSequence(((IncrementSequence) obj).getDelta());
+                    final IncrementSequence increment = (IncrementSequence) obj;
+                    successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+                        increment.getSequence(), localActor(), increment.getDelta()), resp -> { }, now);
+                    LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
                 }
             }
             LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());