Adjust to yangtools-2.0.0 changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadWriteTransaction.java
index f9d9580214e41c2d28430c3a46c68b005ad6e8bc..5af7c7954ebbf1abc74ecf14d24fb5c481230b61 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import java.util.Collection;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
@@ -53,22 +54,122 @@ import org.slf4j.LoggerFactory;
  */
 @NotThreadSafe
 final class FrontendReadWriteTransaction extends FrontendTransaction {
+    private enum CommitStage {
+        READY,
+        CAN_COMMIT_PENDING,
+        CAN_COMMIT_COMPLETE,
+        PRE_COMMIT_PENDING,
+        PRE_COMMIT_COMPLETE,
+        COMMIT_PENDING,
+    }
+
+    private abstract static class State {
+        @Override
+        public abstract String toString();
+    }
+
+    private static final class Failed extends State {
+        final RequestException cause;
+
+        Failed(final RequestException cause) {
+            this.cause = Preconditions.checkNotNull(cause);
+        }
+
+        @Override
+        public String toString() {
+            return "FAILED (" + cause.getMessage() + ")";
+        }
+    }
+
+    private static final class Open extends State {
+        final ReadWriteShardDataTreeTransaction openTransaction;
+
+        Open(final ReadWriteShardDataTreeTransaction openTransaction) {
+            this.openTransaction = Preconditions.checkNotNull(openTransaction);
+        }
+
+        @Override
+        public String toString() {
+            return "OPEN";
+        }
+    }
+
+    private static final class Ready extends State {
+        final ShardDataTreeCohort readyCohort;
+        CommitStage stage;
+
+        Ready(final ShardDataTreeCohort readyCohort) {
+            this.readyCohort = Preconditions.checkNotNull(readyCohort);
+            this.stage = CommitStage.READY;
+        }
+
+        @Override
+        public String toString() {
+            return "READY (" + stage + ")";
+        }
+    }
+
+    private static final class Sealed extends State {
+        final DataTreeModification sealedModification;
+
+        Sealed(final DataTreeModification sealedModification) {
+            this.sealedModification = Preconditions.checkNotNull(sealedModification);
+        }
+
+        @Override
+        public String toString() {
+            return "SEALED";
+        }
+    }
+
+    /**
+     * Retired state, needed to catch and suppress callbacks after we have removed associated state.
+     */
+    private static final class Retired extends State {
+        private final String prevStateString;
+
+        Retired(final State prevState) {
+            prevStateString = prevState.toString();
+        }
+
+        @Override
+        public String toString() {
+            return "RETIRED (in " + prevStateString + ")";
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
+    private static final State ABORTED = new State() {
+        @Override
+        public String toString() {
+            return "ABORTED";
+        }
+    };
+    private static final State ABORTING = new State() {
+        @Override
+        public String toString() {
+            return "ABORTING";
+        }
+    };
+    private static final State COMMITTED = new State() {
+        @Override
+        public String toString() {
+            return "COMMITTED";
+        }
+    };
 
-    private ReadWriteShardDataTreeTransaction openTransaction;
-    private DataTreeModification sealedModification;
-    private ShardDataTreeCohort readyCohort;
+    private State state;
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final ReadWriteShardDataTreeTransaction transaction) {
         super(history, id);
-        this.openTransaction = Preconditions.checkNotNull(transaction);
+        this.state = new Open(transaction);
     }
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final DataTreeModification mod) {
         super(history, id);
-        this.sealedModification = Preconditions.checkNotNull(mod);
+        this.state = new Sealed(mod);
     }
 
     static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
@@ -83,7 +184,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     // Sequence has already been checked
     @Override
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+    @Nullable TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
             final long now) throws RequestException {
         if (request instanceof ModifyTransactionRequest) {
             return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
@@ -101,105 +202,260 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            return handleTransactionAbort(request.getSequence(), envelope, now);
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            handleLocalTransactionAbort(request.getSequence(), envelope, now);
             return null;
         } else {
+            LOG.warn("Rejecting unsupported request {}", request);
             throw new UnsupportedRequestException(request);
         }
     }
 
+    @Override
+    void retire() {
+        state = new Retired(state);
+    }
+
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate result) {
-                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
-                    request.getSequence()));
-            }
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case PRE_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
+                break;
+            case CAN_COMMIT_COMPLETE:
+                ready.stage = CommitStage.PRE_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
+                ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+                    @Override
+                    public void onSuccess(final DataTreeCandidate result) {
+                        successfulPreCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_PENDING:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case READY:
+                throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
+            default:
+                throwUnhandledCommitStage(ready);
+        }
+    }
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
-                readyCohort = null;
-            }
-        });
+    void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
+                getIdentifier());
+            return;
+        }
+
+        final Ready ready = checkReady();
+        LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
+        recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
+            envelope.getMessage().getSequence()));
+        ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+    }
+
+    void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
+            return;
+        }
+
+        recordAndSendFailure(envelope, now, cause);
+        state = new Failed(cause);
+        LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
     }
 
     private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
             final long now) throws RequestException {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope, now);
-            }
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
+                break;
+            case PRE_COMMIT_COMPLETE:
+                ready.stage = CommitStage.COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
+                ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
+                    @Override
+                    public void onSuccess(final UnsignedLong result) {
+                        successfulCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_COMPLETE:
+            case CAN_COMMIT_PENDING:
+            case PRE_COMMIT_PENDING:
+            case READY:
+                throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
+            default:
+                throwUnhandledCommitStage(ready);
+        }
+    }
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
-                readyCohort = null;
-            }
-        });
+    private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
+        checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+            sequence)));
     }
 
-    private void handleTransactionAbort(final TransactionAbortRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        if (readyCohort == null) {
-            openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
-                new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
-            return;
+    private void startAbort() {
+        state = ABORTING;
+        LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
+    }
+
+    private void finishAbort() {
+        state = ABORTED;
+        LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
+    }
+
+    private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
+            final long now) {
+        if (state instanceof Open) {
+            final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+            startAbort();
+            openTransaction.abort(() -> {
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+                    sequence));
+                finishAbort();
+            });
+            return null;
+        }
+        if (ABORTING.equals(state)) {
+            LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+            return null;
+        }
+        if (ABORTED.equals(state)) {
+            // We should have recorded the reply
+            LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
+            return new TransactionAbortSuccess(getIdentifier(), sequence);
         }
 
-        readyCohort.abort(new FutureCallback<Void>() {
+        final Ready ready = checkReady();
+        startAbort();
+        ready.readyCohort.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                readyCohort = null;
-                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
-                    request.getSequence()));
-                LOG.debug("Transaction {} aborted", getIdentifier());
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
+                finishAbort();
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                readyCohort = null;
-                LOG.warn("Transaction {} abort failed", getIdentifier(), failure);
                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
+                LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
+                finishAbort();
             }
         });
+        return null;
     }
 
-    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
-                    envelope.getMessage().getSequence()));
-            }
+    private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+        throwIfFailed();
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case CAN_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
+                break;
+            case READY:
+                ready.stage = CommitStage.CAN_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
+                checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        successfulCanCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_COMPLETE:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case PRE_COMMIT_PENDING:
+                throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
+            default:
+                throwUnhandledCommitStage(ready);
+        }
     }
 
-    private void directCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                successfulDirectCanCommit(envelope, now);
-            }
+    void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
+                getIdentifier());
+            return;
+        }
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
+        final Ready ready = checkReady();
+        recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
+            envelope.getMessage().getSequence()));
+        ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
+        LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+    }
+
+    private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case CAN_COMMIT_COMPLETE:
+            case CAN_COMMIT_PENDING:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case PRE_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
+                    getIdentifier(), state, envelope);
+                break;
+            case READY:
+                ready.stage = CommitStage.CAN_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
+                ready.readyCohort.canCommit(new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        successfulDirectCanCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                    }
+                });
+                break;
+            default:
+                throwUnhandledCommitStage(ready);
+        }
     }
 
-    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+    void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
+        final Ready ready = checkReady();
+        ready.stage = CommitStage.PRE_COMMIT_PENDING;
+        LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
+        ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
                 successfulDirectPreCommit(envelope, startTime);
@@ -207,15 +463,21 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
-                readyCohort = null;
+                failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
             }
         });
     }
 
-    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
+    void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
 
+        final Ready ready = checkReady();
+        ready.stage = CommitStage.COMMIT_PENDING;
+        LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
+        ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
@@ -223,43 +485,56 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
-                readyCohort = null;
+                failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
             }
         });
     }
 
-    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
-        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+    void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
+        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
             envelope.getMessage().getSequence()));
-        readyCohort = null;
+        state = COMMITTED;
     }
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        if (sealedModification.equals(request.getModification())) {
-            readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+        final DataTreeModification sealedModification = checkSealed();
+        if (!sealedModification.equals(request.getModification())) {
+            LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
+            throw new UnsupportedRequestException(request);
+        }
 
-            if (request.isCoordinated()) {
-                coordinatedCommit(envelope, now);
-            } else {
-                directCommit(envelope, now);
-            }
+        final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
+        if (optFailure.isPresent()) {
+            state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
         } else {
-            throw new UnsupportedRequestException(request);
+            state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification));
+        }
+
+        if (request.isCoordinated()) {
+            coordinatedCommit(envelope, now);
+        } else {
+            directCommit(envelope, now);
         }
     }
 
     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
             throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        final Optional<NormalizedNode<?, ?>> data = Optional.fromJavaUtil(checkOpen().getSnapshot().readNode(
+            request.getPath()));
         return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
             data.isPresent()));
     }
 
     private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
             throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        final Optional<NormalizedNode<?, ?>> data = Optional.fromJavaUtil(checkOpen().getSnapshot().readNode(
+            request.getPath()));
         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
             data));
     }
@@ -268,13 +543,10 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
     }
 
-    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-
-        final Collection<TransactionModification> mods = request.getModifications();
-        if (!mods.isEmpty()) {
-            final DataTreeModification modification = openTransaction.getSnapshot();
-            for (TransactionModification m : mods) {
+    private void applyModifications(final Collection<TransactionModification> modifications) {
+        if (!modifications.isEmpty()) {
+            final DataTreeModification modification = checkOpen().getSnapshot();
+            for (TransactionModification m : modifications) {
                 if (m instanceof TransactionDelete) {
                     modification.delete(m.getPath());
                 } else if (m instanceof TransactionWrite) {
@@ -282,44 +554,93 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 } else if (m instanceof TransactionMerge) {
                     modification.merge(m.getPath(), ((TransactionMerge) m).getData());
                 } else {
-                    LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+                    LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
                 }
             }
         }
+    }
 
+    @Nullable
+    private TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
+        // protocol, there is nothing for us to do.
         final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
         if (!maybeProto.isPresent()) {
+            applyModifications(request.getModifications());
             return replyModifySuccess(request.getSequence());
         }
 
         switch (maybeProto.get()) {
             case ABORT:
-                openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
-                openTransaction = null;
+                if (ABORTING.equals(state)) {
+                    LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+                    return null;
+                }
+                final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+                startAbort();
+                openTransaction.abort(() -> {
+                    recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
+                        request.getSequence()));
+                    finishAbort();
+                });
                 return null;
             case READY:
-                ensureReady();
+                ensureReady(request.getModifications());
                 return replyModifySuccess(request.getSequence());
             case SIMPLE:
-                ensureReady();
+                ensureReady(request.getModifications());
                 directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
-                ensureReady();
+                ensureReady(request.getModifications());
                 coordinatedCommit(envelope, now);
                 return null;
             default:
+                LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
                 throw new UnsupportedRequestException(request);
         }
     }
 
-    private void ensureReady() {
+    private void ensureReady(final Collection<TransactionModification> modifications) {
         // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
         // only once.
-        if (readyCohort == null) {
-            readyCohort = openTransaction.ready();
-            LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
-            openTransaction = null;
+        if (state instanceof Ready) {
+            LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
+            return;
+        }
+
+        applyModifications(modifications);
+        state = new Ready(checkOpen().ready());
+        LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
+    }
+
+    private void throwIfFailed() throws RequestException {
+        if (state instanceof Failed) {
+            LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
+            throw ((Failed) state).cause;
         }
     }
+
+    private ReadWriteShardDataTreeTransaction checkOpen() {
+        Preconditions.checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(),
+            state);
+        return ((Open) state).openTransaction;
+    }
+
+    private Ready checkReady() {
+        Preconditions.checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(),
+            state);
+        return (Ready) state;
+    }
+
+    private DataTreeModification checkSealed() {
+        Preconditions.checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(),
+            state);
+        return ((Sealed) state).sealedModification;
+    }
+
+    private static void throwUnhandledCommitStage(final Ready ready) {
+        throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+    }
 }