BUG-8704: rework seal mechanics to not wait during replay
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
index a70e6c5b6cb2f238e20239626de37fed6b0c56c9..1ba96426df75feb7e38e4db0f217c8c4f98708b8 100644 (file)
@@ -78,8 +78,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     private volatile Exception operationFailure;
 
     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
-            final boolean snapshotOnly, final boolean sendReadyOnSeal) {
-        super(parent);
+            final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
+        super(parent, isDone);
         this.snapshotOnly = snapshotOnly;
         this.sendReadyOnSeal = sendReadyOnSeal;
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
@@ -138,13 +138,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             isSnapshotOnly()), t -> completeRead(future, t), future);
     }
 
-    @Override
-    void doAbort() {
-        ensureInitializedBuilder();
-        builder.setAbort();
-        flushBuilder();
-    }
-
     private void ensureInitializedBuilder() {
         if (!builderBusy) {
             builder.setSequence(nextSequence());
@@ -153,9 +146,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void ensureFlushedBuider() {
-        if (builderBusy) {
-            flushBuilder();
-        }
+        ensureFlushedBuider(Optional.absent());
     }
 
     private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
@@ -164,10 +155,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
-    private void flushBuilder() {
-        flushBuilder(Optional.absent());
-    }
-
     private void flushBuilder(final Optional<Long> enqueuedTicks) {
         final ModifyTransactionRequest request = builder.build();
         builderBusy = false;
@@ -256,31 +243,37 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         recordFinishedRequest(response);
     }
 
-    private ModifyTransactionRequest abortRequest() {
+    @Override
+    ModifyTransactionRequest abortRequest() {
         ensureInitializedBuilder();
         builder.setAbort();
-        final ModifyTransactionRequest ret = builder.build();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
     ModifyTransactionRequest commitRequest(final boolean coordinated) {
         ensureInitializedBuilder();
         builder.setCommit(coordinated);
+        builderBusy = false;
+        return builder.build();
+    }
 
-        final ModifyTransactionRequest ret = builder.build();
+    private ModifyTransactionRequest readyRequest() {
+        ensureInitializedBuilder();
+        builder.setReady();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
-    void doSeal() {
+    boolean sealAndSend(final Optional<Long> enqueuedTicks) {
         if (sendReadyOnSeal) {
             ensureInitializedBuilder();
             builder.setReady();
-            flushBuilder();
+            flushBuilder(enqueuedTicks);
         }
+        return super.sealAndSend(enqueuedTicks);
     }
 
     @Override
@@ -306,7 +299,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -332,7 +329,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         });
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        sendRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            callback.accept(resp);
+                        });
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
@@ -365,9 +366,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider();
-            sendAbort(callback);
+            sendDoAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge();
+            enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
@@ -430,7 +431,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -456,7 +461,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         }, enqueuedTicks);
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        enqueueRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
@@ -490,14 +499,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                 enqueuedTicks);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider(optTicks);
-            enqueueAbort(callback, enqueuedTicks);
+            enqueueDoAbort(callback, enqueuedTicks);
         } else if (request instanceof TransactionPurgeRequest) {
-            enqueuePurge(enqueuedTicks);
+            enqueuePurge(callback, enqueuedTicks);
         } else if (request instanceof IncrementTransactionSequenceRequest) {
             final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
             ensureFlushedBuider(optTicks);
             enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
-                req.getIncrement()), callback, enqueuedTicks);
+                snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
             incrementSequence(req.getIncrement());
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);