Remove FB suppression
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 89fa8fbc2507fc1f25d8de01cabb48a0cfd4359d..8832cd6d1f6db28fd834134930bfa68ffac5bfab 100644 (file)
@@ -12,8 +12,15 @@ import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.primitives.UnsignedLong;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayDeque;
@@ -27,15 +34,19 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -68,6 +79,7 @@ import scala.concurrent.duration.Duration;
  * e.g. it does not expose public interfaces and assumes it is only ever called from a
  * single thread.
  *
+ * <p>
  * This class is not part of the API contract and is subject to change at any time.
  */
 @NotThreadSafe
@@ -90,6 +102,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
+    private final Collection<ShardDataTreeMetadata<?>> metadata;
     private final TipProducingDataTree dataTree;
     private final String logContext;
     private final Shard shard;
@@ -99,14 +112,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
-        this.dataTree = dataTree;
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final ShardDataTreeMetadata<?>... metadata) {
+        this.dataTree = Preconditions.checkNotNull(dataTree);
         updateSchemaContext(schemaContext);
 
         this.shard = Preconditions.checkNotNull(shard);
         this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
+        this.metadata = ImmutableList.copyOf(metadata);
     }
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
@@ -122,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 new DefaultShardDataChangeListenerPublisher(), "");
     }
 
-    String logContext() {
+    final String logContext() {
         return logContext;
     }
 
+    final Ticker ticker() {
+        return shard.ticker();
+    }
+
     public TipProducingDataTree getDataTree() {
         return dataTree;
     }
@@ -134,26 +153,227 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return schemaContext;
     }
 
-    void updateSchemaContext(final SchemaContext schemaContext) {
-        dataTree.setSchemaContext(schemaContext);
-        this.schemaContext = Preconditions.checkNotNull(schemaContext);
+    void updateSchemaContext(final SchemaContext newSchemaContext) {
+        dataTree.setSchemaContext(newSchemaContext);
+        this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
-    ShardDataTreeSnapshot takeRecoverySnapshot() {
-        return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
+    /**
+     * Take a snapshot of current state for later recovery.
+     *
+     * @return A state snapshot
+     */
+    @Nonnull ShardDataTreeSnapshot takeStateSnapshot() {
+        final NormalizedNode<?, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get();
+        final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
+                ImmutableMap.builder();
+
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            final ShardDataTreeSnapshotMetadata<?> meta = m.toSnapshot();
+            if (meta != null) {
+                metaBuilder.put(meta.getType(), meta);
+            }
+        }
+
+        return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
     }
 
-    void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
-        // FIXME: purge any outstanding transactions
+    private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
+            final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
+        final Stopwatch elapsed = Stopwatch.createStarted();
 
-        final DataTreeModification snapshot = transaction.getSnapshot();
-        snapshot.ready();
+        if (!pendingTransactions.isEmpty()) {
+            LOG.warn("{}: applying state snapshot with pending transactions", logContext);
+        }
+
+        final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> snapshotMeta;
+        if (snapshot instanceof MetadataShardDataTreeSnapshot) {
+            snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata();
+        } else {
+            snapshotMeta = ImmutableMap.of();
+        }
+
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            final ShardDataTreeSnapshotMetadata<?> s = snapshotMeta.get(m.getSupportedType());
+            if (s != null) {
+                m.applySnapshot(s);
+            } else {
+                m.reset();
+            }
+        }
+
+        final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification());
+        // delete everything first
+        mod.delete(YangInstanceIdentifier.EMPTY);
+
+        final java.util.Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
+        if (maybeNode.isPresent()) {
+            // Add everything from the remote node back
+            mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+        }
+        mod.ready();
+
+        final DataTreeModification unwrapped = unwrap(mod);
+        dataTree.validate(unwrapped);
+        DataTreeCandidateTip candidate = dataTree.prepare(unwrapped);
+        dataTree.commit(candidate);
+        notifyListeners(candidate);
 
-        dataTree.validate(snapshot);
-        dataTree.commit(dataTree.prepare(snapshot));
+        LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
     }
 
-    private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+    /**
+     * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+     * does not perform any pruning.
+     *
+     * @param snapshot Snapshot that needs to be applied
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+        applySnapshot(snapshot, UnaryOperator.identity());
+    }
+
+    private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
+        return new PruningDataTreeModification(delegate, dataTree, schemaContext);
+    }
+
+    private static DataTreeModification unwrap(final DataTreeModification modification) {
+        if (modification instanceof PruningDataTreeModification) {
+            return ((PruningDataTreeModification)modification).delegate();
+        }
+        return modification;
+    }
+
+    /**
+     * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
+     * pruning in an attempt to adjust the state to our current SchemaContext.
+     *
+     * @param snapshot Snapshot that needs to be applied
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+        applySnapshot(snapshot, this::wrapWithPruning);
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+        final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
+        DataTreeCandidates.applyToModification(mod, candidate);
+        mod.ready();
+
+        final DataTreeModification unwrapped = mod.delegate();
+        LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
+
+        try {
+            dataTree.validate(unwrapped);
+            dataTree.commit(dataTree.prepare(unwrapped));
+        } catch (Exception e) {
+            File file = new File(System.getProperty("karaf.data", "."),
+                    "failed-recovery-payload-" + logContext + ".out");
+            DataTreeModificationOutput.toFile(file, unwrapped);
+            throw new IllegalStateException(String.format(
+                    "%s: Failed to apply recovery payload. Modification data was written to file %s",
+                    logContext, file), e);
+        }
+    }
+
+    /**
+     * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
+     * pruning in an attempt to adjust the state to our current SchemaContext.
+     *
+     * @param payload Payload
+     * @throws IOException when the snapshot fails to deserialize
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+        if (payload instanceof CommitTransactionPayload) {
+            final Entry<TransactionIdentifier, DataTreeCandidate> e =
+                    ((CommitTransactionPayload) payload).getCandidate();
+            applyRecoveryCandidate(e.getValue());
+            allMetadataCommittedTransaction(e.getKey());
+        } else if (payload instanceof DataTreeCandidatePayload) {
+            applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
+        } else {
+            LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
+        }
+    }
+
+    private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+            throws DataValidationFailedException {
+        LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+
+        final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+        DataTreeCandidates.applyToModification(mod, foreign);
+        mod.ready();
+
+        LOG.trace("{}: Applying foreign modification {}", logContext, mod);
+        dataTree.validate(mod);
+        final DataTreeCandidate candidate = dataTree.prepare(mod);
+        dataTree.commit(candidate);
+
+        notifyListeners(candidate);
+    }
+
+    /**
+     * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower
+     * SchemaContexts match and does not perform any pruning.
+     *
+     * @param identifier Payload identifier as returned from RaftActor
+     * @param payload Payload
+     * @throws IOException when the snapshot fails to deserialize
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+            DataValidationFailedException {
+        /*
+         * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
+         * if we are the leader and it has originated with us.
+         *
+         * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately,
+         * though, this may not be the case anymore, as we are being called some time afterwards and we may not be
+         * acting in that capacity anymore.
+         *
+         * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
+         * pre-Boron state -- which limits the number of options here.
+         */
+        if (payload instanceof CommitTransactionPayload) {
+            if (identifier == null) {
+                final Entry<TransactionIdentifier, DataTreeCandidate> e =
+                        ((CommitTransactionPayload) payload).getCandidate();
+                applyReplicatedCandidate(e.getKey(), e.getValue());
+                allMetadataCommittedTransaction(e.getKey());
+            } else {
+                Verify.verify(identifier instanceof TransactionIdentifier);
+                payloadReplicationComplete((TransactionIdentifier) identifier);
+            }
+        } else {
+            LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
+        }
+    }
+
+    private void payloadReplicationComplete(final TransactionIdentifier txId) {
+        final CommitEntry current = pendingTransactions.peek();
+        if (current == null) {
+            LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            return;
+        }
+
+        if (!current.cohort.getIdentifier().equals(txId)) {
+            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+                current.cohort.getIdentifier(), txId);
+            return;
+        }
+
+        finishCommit(current.cohort);
+    }
+
+    private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionCommitted(txId);
+        }
+    }
+
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
         ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
@@ -180,6 +400,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
     }
 
+    @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
         treeChangeListenerPublisher.publishChanges(candidate, logContext);
         dataChangeListenerPublisher.publishChanges(candidate, logContext);
@@ -225,22 +446,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
                     final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
                     final DataChangeScope scope) {
-        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+        DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
                 dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
 
         return new SimpleEntry<>(reg, readCurrentData());
     }
 
     private Optional<DataTreeCandidate> readCurrentData() {
-        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+        final Optional<NormalizedNode<?, ?>> currentState =
+                dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
-    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
-            final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
-                path, listener);
+    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
+            registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+        final ListenerRegistration<DOMDataTreeChangeListener> reg =
+                treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
 
         return new SimpleEntry<>(reg, readCurrentData());
     }
@@ -249,20 +471,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return pendingTransactions.size();
     }
 
-    void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
-        LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
-
-        final DataTreeModification mod = dataTree.takeSnapshot().newModification();
-        DataTreeCandidates.applyToModification(mod, foreign);
-        mod.ready();
-
-        LOG.trace("{}: Applying foreign modification {}", logContext, mod);
-        dataTree.validate(mod);
-        final DataTreeCandidate candidate = dataTree.prepare(mod);
-        dataTree.commit(candidate);
-        notifyListeners(candidate);
-    }
-
     @Override
     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
         // Intentional no-op
@@ -273,34 +481,40 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
-        return createReadyCohort(transaction.getId(), snapshot);
+        return createReadyCohort(transaction.getIdentifier(), snapshot);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
         return dataTree.takeSnapshot().readNode(path);
     }
 
-    public DataTreeSnapshot takeSnapshot() {
+    DataTreeSnapshot takeSnapshot() {
         return dataTree.takeSnapshot();
     }
 
+    @VisibleForTesting
     public DataTreeModification newModification() {
         return dataTree.takeSnapshot().newModification();
     }
 
+    /**
+     * Commits a modification.
+     *
+     * @deprecated This method violates DataTree containment and will be removed.
+     */
     @VisibleForTesting
-    // FIXME: This should be removed, it violates encapsulation
+    @Deprecated
     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
         modification.ready();
         dataTree.validate(modification);
-        DataTreeCandidateTip candidate = dataTree.prepare(modification);
+        DataTreeCandidate candidate = dataTree.prepare(modification);
         dataTree.commit(candidate);
         return candidate;
     }
 
     public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
         Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
-        for(CommitEntry entry: pendingTransactions) {
+        for (CommitEntry entry: pendingTransactions) {
             ret.add(entry.cohort);
         }
 
@@ -308,13 +522,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextTransaction() {
         while (!pendingTransactions.isEmpty()) {
             final CommitEntry entry = pendingTransactions.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
 
-            if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+            if (cohort.getState() != State.CAN_COMMIT_PENDING) {
                 break;
             }
 
@@ -336,7 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
                 // For debugging purposes, allow dumping of the modification. Coupled with the above
                 // precondition log, it should allow us to understand what went on.
-                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+                        dataTree);
                 cause = new TransactionCommitFailedException("Data did not pass validation.", e);
             } catch (Exception e) {
                 LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -366,6 +582,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextTransaction();
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     void startPreCommit(final SimpleShardDataTreeCohort cohort) {
         final CommitEntry entry = pendingTransactions.peek();
         Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
@@ -397,6 +614,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextTransaction();
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void finishCommit(final SimpleShardDataTreeCohort cohort) {
         final TransactionIdentifier txId = cohort.getIdentifier();
         final DataTreeCandidate candidate = cohort.getCandidate();
@@ -404,24 +622,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
 
         try {
-            try {
-                dataTree.commit(candidate);
-            } catch (IllegalStateException e) {
-                // We may get a "store tree and candidate base differ" IllegalStateException from commit under
-                // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
-                // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
-                // applying it to the state. We then become the leader and a second tx is pre-committed and
-                // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
-                // candidate via applyState prior to the second tx. Since the second tx has already been
-                // pre-committed, when it gets here to commit it will get an IllegalStateException.
-
-                // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
-                // solution will be forthcoming.
-
-                LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
-                applyForeignCandidate(txId, candidate);
-            }
+            dataTree.commit(candidate);
         } catch (Exception e) {
+            LOG.error("{}: Failed to commit transaction {}", logContext, txId, e);
             failCommit(e);
             return;
         }
@@ -430,7 +633,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
         // FIXME: propagate journal index
-
         pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
 
         LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
@@ -468,32 +670,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
     }
 
-    private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
-        final CommitEntry current = pendingTransactions.peek();
-        if (current == null) {
-            LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
-            return;
-        }
-
-        if (!current.cohort.getIdentifier().equals(txId)) {
-            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
-                current.cohort.getIdentifier(), txId);
-            return;
-        }
-
-        finishCommit(current.cohort);
-    }
-
-    void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
-        // For now we do not care about anything else but transactions
-        Verify.verify(identifier instanceof TransactionIdentifier);
-        payloadReplicationComplete((TransactionIdentifier)identifier, payload);
-    }
-
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
 
+    @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
             final DataTreeModification modification) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
@@ -502,11 +683,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return cohort;
     }
 
-    void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
-            throws DataValidationFailedException, IOException {
-        applyForeignCandidate(identifier, payload.getCandidate().getValue());
-    }
-
+    @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"},
+            justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         final long now = shard.ticker().read();
@@ -520,6 +698,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
                     break;
                 case CAN_COMMIT_COMPLETE:
+                    // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+                    // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+                    // in PRE_COMMIT_COMPLETE is changed.
                     pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
                     break;
                 case PRE_COMMIT_PENDING:
@@ -556,6 +737,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 case FAILED:
                 case READY:
                 default:
+                    // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
+                    // this case, we just want to drop the current entry that expired and thus ignore the return value.
+                    // In fact we really shouldn't hit this case but we handle all enums for completeness.
                     pendingTransactions.poll();
             }
 
@@ -578,7 +762,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             if (cohort.getState() != State.COMMIT_PENDING) {
                 LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
                     cohort.getIdentifier());
-                pendingTransactions.poll();
+
+                pendingTransactions.remove();
                 processNextTransaction();
             } else {
                 LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
@@ -605,12 +790,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private void maybeRunOperationOnPendingTransactionsComplete() {
-      if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
-          LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
-                  runOnPendingTransactionsComplete);
-
-          runOnPendingTransactionsComplete.run();
-          runOnPendingTransactionsComplete = null;
-      }
-  }
+        if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+            LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+                    runOnPendingTransactionsComplete);
+
+            runOnPendingTransactionsComplete.run();
+            runOnPendingTransactionsComplete = null;
+        }
+    }
 }