BUG-5280: expand ShardDataTree to cover transaction mechanics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 56c5eb65bf087c3a0d979de2ecaff7b0a65467a2..89fa8fbc2507fc1f25d8de01cabb48a0cfd4359d 100644 (file)
@@ -7,32 +7,61 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.primitives.UnsignedLong;
+import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
-import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 /**
  * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
@@ -42,64 +71,137 @@ import org.slf4j.LoggerFactory;
  * This class is not part of the API contract and is subject to change at any time.
  */
 @NotThreadSafe
-@VisibleForTesting
-public final class ShardDataTree extends ShardDataTreeTransactionParent {
+public class ShardDataTree extends ShardDataTreeTransactionParent {
+    private static final class CommitEntry {
+        final SimpleShardDataTreeCohort cohort;
+        long lastAccess;
+
+        CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) {
+            this.cohort = Preconditions.checkNotNull(cohort);
+            lastAccess = now;
+        }
+    }
+
+    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
-    private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager();
-    private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
-    private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher();
-    private final ListenerTree listenerTree = ListenerTree.create();
+
+    private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+    private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
+    private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
+    private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final TipProducingDataTree dataTree;
+    private final String logContext;
+    private final Shard shard;
+    private Runnable runOnPendingTransactionsComplete;
 
-    ShardDataTree(final SchemaContext schemaContext) {
-        dataTree = InMemoryDataTreeFactory.getInstance().create();
-        if (schemaContext != null) {
-            dataTree.setSchemaContext(schemaContext);
-        }
+    private SchemaContext schemaContext;
+
+    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
+            final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
+        this.dataTree = dataTree;
+        updateSchemaContext(schemaContext);
+
+        this.shard = Preconditions.checkNotNull(shard);
+        this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
+        this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
+        this.logContext = Preconditions.checkNotNull(logContext);
     }
 
-    TipProducingDataTree getDataTree() {
+    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+            final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
+        this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
+                treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
+    }
+
+    @VisibleForTesting
+    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
+        this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+                new DefaultShardDataChangeListenerPublisher(), "");
+    }
+
+    String logContext() {
+        return logContext;
+    }
+
+    public TipProducingDataTree getDataTree() {
         return dataTree;
     }
 
+    SchemaContext getSchemaContext() {
+        return schemaContext;
+    }
+
     void updateSchemaContext(final SchemaContext schemaContext) {
         dataTree.setSchemaContext(schemaContext);
+        this.schemaContext = Preconditions.checkNotNull(schemaContext);
     }
 
-    private ShardDataTreeTransactionChain ensureTransactionChain(final String chainId) {
-        ShardDataTreeTransactionChain chain = transactionChains.get(chainId);
+    ShardDataTreeSnapshot takeRecoverySnapshot() {
+        return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
+    }
+
+    void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
+        // FIXME: purge any outstanding transactions
+
+        final DataTreeModification snapshot = transaction.getSnapshot();
+        snapshot.ready();
+
+        dataTree.validate(snapshot);
+        dataTree.commit(dataTree.prepare(snapshot));
+    }
+
+    private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+        ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
         if (chain == null) {
-            chain = new ShardDataTreeTransactionChain(chainId, this);
-            transactionChains.put(chainId, chain);
+            chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
+            transactionChains.put(localHistoryIdentifier, chain);
         }
 
         return chain;
     }
 
-    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId, final String chainId) {
-        if (Strings.isNullOrEmpty(chainId)) {
+    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+        if (txId.getHistoryId().getHistoryId() == 0) {
             return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
         }
 
-        return ensureTransactionChain(chainId).newReadOnlyTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
     }
 
-    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId, final String chainId) {
-        if (Strings.isNullOrEmpty(chainId)) {
-            return new ReadWriteShardDataTreeTransaction(this, txId, dataTree.takeSnapshot().newModification());
+    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+        if (txId.getHistoryId().getHistoryId() == 0) {
+            return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
+                    .newModification());
         }
 
-        return ensureTransactionChain(chainId).newReadWriteTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
     }
 
-    void notifyListeners(final DataTreeCandidate candidate) {
-        LOG.debug("Notifying listeners on candidate {}", candidate);
+    public void notifyListeners(final DataTreeCandidate candidate) {
+        treeChangeListenerPublisher.publishChanges(candidate, logContext);
+        dataChangeListenerPublisher.publishChanges(candidate, logContext);
+    }
 
-        // DataTreeChanges first, as they are more light-weight
-        treeChangePublisher.publishChanges(candidate);
+    void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+            NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
+        if (currentState.isPresent()) {
+            ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
+            localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
+                    listenerReg.getScope());
+            localPublisher.publishChanges(currentState.get(), logContext);
+        }
+    }
 
-        // DataChanges second, as they are heavier
-        ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER);
+    void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+            final Optional<DataTreeCandidate> currentState) {
+        if (currentState.isPresent()) {
+            ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
+            localPublisher.registerTreeChangeListener(path, listener);
+            localPublisher.publishChanges(currentState.get(), logContext);
+        }
     }
 
     void closeAllTransactionChains() {
@@ -110,55 +212,51 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent {
         transactionChains.clear();
     }
 
-    void closeTransactionChain(final String transactionChainId) {
+    void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
         final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
         if (chain != null) {
             chain.close();
         } else {
-            LOG.debug("Closing non-existent transaction chain {}", transactionChainId);
+            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
         }
     }
 
-    Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> registerChangeListener(
-            final YangInstanceIdentifier path,
-            final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final DataChangeScope scope) {
-        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
-                listenerTree.registerDataChangeListener(path, listener, scope);
+    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+            Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
+                    final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+                    final DataChangeScope scope) {
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+                dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
 
-        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
-        final DOMImmutableDataChangeEvent event;
-        if (currentState.isPresent()) {
-            final NormalizedNode<?, ?> data = currentState.get();
-            event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE).setAfter(data).addCreated(path, data).build();
-        } else {
-            event = null;
-        }
+        return new SimpleEntry<>(reg, readCurrentData());
+    }
 
-        return new SimpleEntry<>(reg, event);
+    private Optional<DataTreeCandidate> readCurrentData() {
+        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+        return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
+            YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
-    Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> registerTreeChangeListener(final YangInstanceIdentifier path,
-            final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(path, listener);
+    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
+            final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
+                path, listener);
 
-        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
-        final DataTreeCandidate event;
-        if (currentState.isPresent()) {
-            event = DataTreeCandidates.fromNormalizedNode(path, currentState.get());
-        } else {
-            event = null;
-        }
-        return new SimpleEntry<>(reg, event);
+        return new SimpleEntry<>(reg, readCurrentData());
+    }
+
+    int getQueueSize() {
+        return pendingTransactions.size();
     }
 
-    void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
-        LOG.debug("Applying foreign transaction {}", identifier);
+    void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
+        LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
         final DataTreeModification mod = dataTree.takeSnapshot().newModification();
         DataTreeCandidates.applyToModification(mod, foreign);
         mod.ready();
 
-        LOG.trace("Applying foreign modification {}", mod);
+        LOG.trace("{}: Applying foreign modification {}", logContext, mod);
         dataTree.validate(mod);
         final DataTreeCandidate candidate = dataTree.prepare(mod);
         dataTree.commit(candidate);
@@ -174,7 +272,345 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
-        return new SimpleShardDataTreeCohort(this, snapshot);
+
+        return createReadyCohort(transaction.getId(), snapshot);
+    }
+
+    public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+        return dataTree.takeSnapshot().readNode(path);
+    }
+
+    public DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
+    }
+
+    public DataTreeModification newModification() {
+        return dataTree.takeSnapshot().newModification();
+    }
+
+    @VisibleForTesting
+    // FIXME: This should be removed, it violates encapsulation
+    public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+        modification.ready();
+        dataTree.validate(modification);
+        DataTreeCandidateTip candidate = dataTree.prepare(modification);
+        dataTree.commit(candidate);
+        return candidate;
+    }
+
+    public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
+        Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
+        for(CommitEntry entry: pendingTransactions) {
+            ret.add(entry.cohort);
+        }
+
+        pendingTransactions.clear();
+        return ret;
+    }
+
+    private void processNextTransaction() {
+        while (!pendingTransactions.isEmpty()) {
+            final CommitEntry entry = pendingTransactions.peek();
+            final SimpleShardDataTreeCohort cohort = entry.cohort;
+            final DataTreeModification modification = cohort.getDataTreeModification();
+
+            if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+                break;
+            }
+
+            LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+            Exception cause;
+            try {
+                dataTree.validate(modification);
+                LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+                cohort.successfulCanCommit();
+                entry.lastAccess = shard.ticker().read();
+                return;
+            } catch (ConflictingModificationAppliedException e) {
+                LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+                    e.getPath());
+                cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
+            } catch (DataValidationFailedException e) {
+                LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+                    e.getPath(), e);
+
+                // For debugging purposes, allow dumping of the modification. Coupled with the above
+                // precondition log, it should allow us to understand what went on.
+                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+                cause = new TransactionCommitFailedException("Data did not pass validation.", e);
+            } catch (Exception e) {
+                LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
+                cause = e;
+            }
+
+            // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
+            pendingTransactions.poll().cohort.failedCanCommit(cause);
+        }
+
+        maybeRunOperationOnPendingTransactionsComplete();
     }
 
+    void startCanCommit(final SimpleShardDataTreeCohort cohort) {
+        final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
+        if (!cohort.equals(current)) {
+            LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
+            return;
+        }
+
+        processNextTransaction();
+    }
+
+    private void failPreCommit(final Exception cause) {
+        shard.getShardMBean().incrementFailedTransactionsCount();
+        pendingTransactions.poll().cohort.failedPreCommit(cause);
+        processNextTransaction();
+    }
+
+    void startPreCommit(final SimpleShardDataTreeCohort cohort) {
+        final CommitEntry entry = pendingTransactions.peek();
+        Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+
+        final SimpleShardDataTreeCohort current = entry.cohort;
+        Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+        final DataTreeCandidateTip candidate;
+        try {
+            candidate = dataTree.prepare(cohort.getDataTreeModification());
+        } catch (Exception e) {
+            failPreCommit(e);
+            return;
+        }
+
+        try {
+            cohort.userPreCommit(candidate);
+        } catch (ExecutionException | TimeoutException e) {
+            failPreCommit(e);
+            return;
+        }
+
+        entry.lastAccess = shard.ticker().read();
+        cohort.successfulPreCommit(candidate);
+    }
+
+    private void failCommit(final Exception cause) {
+        shard.getShardMBean().incrementFailedTransactionsCount();
+        pendingTransactions.poll().cohort.failedCommit(cause);
+        processNextTransaction();
+    }
+
+    private void finishCommit(final SimpleShardDataTreeCohort cohort) {
+        final TransactionIdentifier txId = cohort.getIdentifier();
+        final DataTreeCandidate candidate = cohort.getCandidate();
+
+        LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
+
+        try {
+            try {
+                dataTree.commit(candidate);
+            } catch (IllegalStateException e) {
+                // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+                // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+                // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+                // applying it to the state. We then become the leader and a second tx is pre-committed and
+                // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+                // candidate via applyState prior to the second tx. Since the second tx has already been
+                // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+                // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+                // solution will be forthcoming.
+
+                LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
+                applyForeignCandidate(txId, candidate);
+            }
+        } catch (Exception e) {
+            failCommit(e);
+            return;
+        }
+
+        shard.getShardMBean().incrementCommittedTransactionCount();
+        shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
+
+        // FIXME: propagate journal index
+
+        pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+
+        LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+        notifyListeners(candidate);
+
+        processNextTransaction();
+    }
+
+    void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
+        final CommitEntry entry = pendingTransactions.peek();
+        Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+
+        final SimpleShardDataTreeCohort current = entry.cohort;
+        Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current);
+
+        if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
+            LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
+            finishCommit(cohort);
+            return;
+        }
+
+        final TransactionIdentifier txId = cohort.getIdentifier();
+        final Payload payload;
+        try {
+            payload = CommitTransactionPayload.create(txId, candidate);
+        } catch (IOException e) {
+            LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
+            pendingTransactions.poll().cohort.failedCommit(e);
+            return;
+        }
+
+        // Once completed, we will continue via payloadReplicationComplete
+        entry.lastAccess = shard.ticker().read();
+        shard.persistPayload(txId, payload);
+        LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+    }
+
+    private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
+        final CommitEntry current = pendingTransactions.peek();
+        if (current == null) {
+            LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            return;
+        }
+
+        if (!current.cohort.getIdentifier().equals(txId)) {
+            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+                current.cohort.getIdentifier(), txId);
+            return;
+        }
+
+        finishCommit(current.cohort);
+    }
+
+    void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
+        // For now we do not care about anything else but transactions
+        Verify.verify(identifier instanceof TransactionIdentifier);
+        payloadReplicationComplete((TransactionIdentifier)identifier, payload);
+    }
+
+    void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+        cohortRegistry.process(sender, message);
+    }
+
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
+            final DataTreeModification modification) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+                cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+        pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+        return cohort;
+    }
+
+    void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
+            throws DataValidationFailedException, IOException {
+        applyForeignCandidate(identifier, payload.getCandidate().getValue());
+    }
+
+    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+        final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
+        final long now = shard.ticker().read();
+        final CommitEntry currentTx = pendingTransactions.peek();
+        if (currentTx != null && currentTx.lastAccess + timeout < now) {
+            LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+                    currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
+            boolean processNext = true;
+            switch (currentTx.cohort.getState()) {
+                case CAN_COMMIT_PENDING:
+                    pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
+                    break;
+                case CAN_COMMIT_COMPLETE:
+                    pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+                    break;
+                case PRE_COMMIT_PENDING:
+                    pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
+                    break;
+                case PRE_COMMIT_COMPLETE:
+                    // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+                    //        are ready we should commit the transaction, not abort it. Our current software stack does
+                    //        not allow us to do that consistently, because we persist at the time of commit, hence
+                    //        we can end up in a state where we have pre-committed a transaction, then a leader failover
+                    //        occurred ... the new leader does not see the pre-committed transaction and does not have
+                    //        a running timer. To fix this we really need two persistence events.
+                    //
+                    //        The first one, done at pre-commit time will hold the transaction payload. When consensus
+                    //        is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+                    //        apply the state in this event.
+                    //
+                    //        The second one, done at commit (or abort) time holds only the transaction identifier and
+                    //        signals to followers that the state should (or should not) be applied.
+                    //
+                    //        In order to make the pre-commit timer working across failovers, though, we need
+                    //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+                    //        restart the timer.
+                    pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+                    break;
+                case COMMIT_PENDING:
+                    LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+                        currentTx.cohort.getIdentifier());
+                    currentTx.lastAccess = now;
+                    processNext = false;
+                    return;
+                case ABORTED:
+                case COMMITTED:
+                case FAILED:
+                case READY:
+                default:
+                    pendingTransactions.poll();
+            }
+
+            if (processNext) {
+                processNextTransaction();
+            }
+        }
+    }
+
+    void startAbort(final SimpleShardDataTreeCohort cohort) {
+        final Iterator<CommitEntry> it = pendingTransactions.iterator();
+        if (!it.hasNext()) {
+            LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+            return;
+        }
+
+        // First entry is special, as it may already be committing
+        final CommitEntry first = it.next();
+        if (cohort.equals(first.cohort)) {
+            if (cohort.getState() != State.COMMIT_PENDING) {
+                LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+                    cohort.getIdentifier());
+                pendingTransactions.poll();
+                processNextTransaction();
+            } else {
+                LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+            }
+
+            return;
+        }
+
+        while (it.hasNext()) {
+            final CommitEntry e = it.next();
+            if (cohort.equals(e.cohort)) {
+                LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+                it.remove();
+                return;
+            }
+        }
+
+        LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+    }
+
+    void setRunOnPendingTransactionsComplete(final Runnable operation) {
+        runOnPendingTransactionsComplete = operation;
+        maybeRunOperationOnPendingTransactionsComplete();
+    }
+
+    private void maybeRunOperationOnPendingTransactionsComplete() {
+      if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+          LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+                  runOnPendingTransactionsComplete);
+
+          runOnPendingTransactionsComplete.run();
+          runOnPendingTransactionsComplete = null;
+      }
+  }
 }