BUG-8704: rework seal mechanics to not wait during replay
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
index 3b5f80ffe8bc02ee267620f7bfafdbcd1cf01a2a..1ba96426df75feb7e38e4db0f217c8c4f98708b8 100644 (file)
@@ -146,9 +146,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void ensureFlushedBuider() {
-        if (builderBusy) {
-            flushBuilder();
-        }
+        ensureFlushedBuider(Optional.absent());
     }
 
     private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
@@ -157,10 +155,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
-    private void flushBuilder() {
-        flushBuilder(Optional.absent());
-    }
-
     private void flushBuilder(final Optional<Long> enqueuedTicks) {
         final ModifyTransactionRequest request = builder.build();
         builderBusy = false;
@@ -253,28 +247,33 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     ModifyTransactionRequest abortRequest() {
         ensureInitializedBuilder();
         builder.setAbort();
-        final ModifyTransactionRequest ret = builder.build();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
     ModifyTransactionRequest commitRequest(final boolean coordinated) {
         ensureInitializedBuilder();
         builder.setCommit(coordinated);
+        builderBusy = false;
+        return builder.build();
+    }
 
-        final ModifyTransactionRequest ret = builder.build();
+    private ModifyTransactionRequest readyRequest() {
+        ensureInitializedBuilder();
+        builder.setReady();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
-    void doSeal() {
+    boolean sealAndSend(final Optional<Long> enqueuedTicks) {
         if (sendReadyOnSeal) {
             ensureInitializedBuilder();
             builder.setReady();
-            flushBuilder();
+            flushBuilder(enqueuedTicks);
         }
+        return super.sealAndSend(enqueuedTicks);
     }
 
     @Override
@@ -300,7 +299,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -326,7 +329,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         });
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        sendRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            callback.accept(resp);
+                        });
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
@@ -424,7 +431,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -450,7 +461,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         }, enqueuedTicks);
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        enqueueRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());