BUG-5280: handle TransactionPurgeRequest replay
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalReadWriteProxyTransaction.java
index 8195f8dcca26650ddc13d301419cfd0d4d18625b..3407da7eab4aa5c11a758f5c0ae039abd154e855 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe
 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.concepts.Response;
@@ -155,13 +156,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
 
         final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
         if (maybeProtocol.isPresent()) {
-            seal();
             Verify.verify(callback != null, "Request {} has null callback", request);
+            ensureSealed();
 
             switch (maybeProtocol.get()) {
                 case ABORT:
                     sendAbort(callback);
                     break;
+                case READY:
+                    // No-op, as we have already issued a seal()
+                    break;
                 case SIMPLE:
                     sendRequest(commitRequest(false), callback);
                     break;
@@ -215,13 +219,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
                 }
             });
 
-            successor.seal();
+            successor.ensureSealed();
 
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
         } else if (request instanceof AbortLocalTransactionRequest) {
             LOG.debug("Forwarding abort {} to successor {}", request, successor);
             successor.abort();
+        } else if (request instanceof TransactionPurgeRequest) {
+            LOG.debug("Forwarding purge {} to successor {}", request, successor);
+            successor.purge();
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -251,7 +258,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             request.getModification().applyToCursor(cursor);
         }
 
-        seal();
+        ensureSealed();
         sendRequest(commitRequest(request.isCoordinated()), callback);
     }
 }