BUG-8403: propagate DONE state to successor
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
index 09a8a605631c1f9d4b5cbe07e95a42a13077e85a..3b5f80ffe8bc02ee267620f7bfafdbcd1cf01a2a 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
@@ -77,8 +78,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     private volatile Exception operationFailure;
 
     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
-            final boolean snapshotOnly, final boolean sendReadyOnSeal) {
-        super(parent);
+            final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
+        super(parent, isDone);
         this.snapshotOnly = snapshotOnly;
         this.sendReadyOnSeal = sendReadyOnSeal;
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
@@ -137,13 +138,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             isSnapshotOnly()), t -> completeRead(future, t), future);
     }
 
-    @Override
-    void doAbort() {
-        ensureInitializedBuilder();
-        builder.setAbort();
-        flushBuilder();
-    }
-
     private void ensureInitializedBuilder() {
         if (!builderBusy) {
             builder.setSequence(nextSequence());
@@ -239,7 +233,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
@@ -252,10 +246,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
-    private ModifyTransactionRequest abortRequest() {
+    @Override
+    ModifyTransactionRequest abortRequest() {
         ensureInitializedBuilder();
         builder.setAbort();
         final ModifyTransactionRequest ret = builder.build();
@@ -341,14 +336,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -364,9 +359,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider();
-            sendAbort(callback);
+            sendDoAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge();
+            enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
@@ -465,14 +460,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -489,9 +484,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                 enqueuedTicks);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider(optTicks);
-            enqueueAbort(callback, enqueuedTicks);
+            enqueueDoAbort(callback, enqueuedTicks);
         } else if (request instanceof TransactionPurgeRequest) {
-            enqueuePurge(enqueuedTicks);
+            enqueuePurge(callback, enqueuedTicks);
+        } else if (request instanceof IncrementTransactionSequenceRequest) {
+            final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
+                snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
+            incrementSequence(req.getIncrement());
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }