NormalizedNodeAggregator should also report empty
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index a097b6808152aaa7b21a0abf31626d8815cf6b11..c1b83923a662d0753cef52f67ff85e7919098b48 100644 (file)
@@ -7,15 +7,16 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static akka.actor.ActorRef.noSender;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
+import static java.util.Objects.requireNonNullElse;
 
 import akka.actor.ActorRef;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -50,7 +51,6 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner;
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
@@ -65,30 +65,36 @@ import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionP
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
+import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,6 +107,8 @@ import scala.concurrent.duration.FiniteDuration;
  * <p>
  * This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe.
  */
+@VisibleForTesting
+// non-final for mocking
 public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final class CommitEntry {
         final SimpleShardDataTreeCohort cohort;
@@ -159,7 +167,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private int currentTransactionBatch;
 
-    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
+    ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final DataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final String logContext,
             final ShardDataTreeMetadata<?>... metadata) {
@@ -173,7 +181,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         tip = dataTree;
     }
 
-    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+    ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType,
             final YangInstanceIdentifier root,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final String logContext,
@@ -191,7 +199,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @VisibleForTesting
-    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
+    public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(),
                 new DefaultShardDataTreeChangeListenerPublisher(""), "");
     }
@@ -204,21 +212,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return shard.ticker().read();
     }
 
-    public DataTree getDataTree() {
+    final DataTree getDataTree() {
         return dataTree;
     }
 
-    SchemaContext getSchemaContext() {
+    @VisibleForTesting
+    final SchemaContext getSchemaContext() {
         return schemaContext;
     }
 
-    void updateSchemaContext(final SchemaContext newSchemaContext) {
-        dataTree.setSchemaContext(newSchemaContext);
-        this.schemaContext = requireNonNull(newSchemaContext);
-        this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+    final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
+        dataTree.setEffectiveModelContext(newSchemaContext);
+        schemaContext = newSchemaContext;
+        dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
     }
 
-    void resetTransactionBatch() {
+    final void resetTransactionBatch() {
         currentTransactionBatch = 0;
     }
 
@@ -228,7 +237,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @return A state snapshot
      */
     @NonNull ShardDataTreeSnapshot takeStateSnapshot() {
-        final NormalizedNode<?, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
+        final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
         final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
                 ImmutableMap.builder();
 
@@ -275,7 +284,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         // delete everything first
         mod.delete(YangInstanceIdentifier.empty());
 
-        final Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
+        final Optional<NormalizedNode> maybeNode = snapshot.getRootNode();
         if (maybeNode.isPresent()) {
             // Add everything from the remote node back
             mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
@@ -297,7 +306,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param snapshot Snapshot that needs to be applied
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+    final void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
         // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
         applySnapshot(snapshot, UnaryOperator.identity());
     }
@@ -309,34 +318,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param snapshot Snapshot that needs to be applied
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
+    final void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
         // TODO: we should be able to reuse the pruner, provided we are not reentrant
-        ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext);
+        final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+            dataSchemaContext);
         if (snapshot.needsMigration()) {
-            pruner = pruner.withUintAdaption();
+            final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption();
+            applySnapshot(snapshot.getSnapshot(),
+                delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner));
+        } else {
+            applySnapshot(snapshot.getSnapshot(),
+                delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner));
         }
-
-        // For lambda below
-        final ReusableNormalizedNodePruner finalPruner = pruner;
-        applySnapshot(snapshot.getSnapshot(),
-            delegate -> new PruningDataTreeModification(delegate, dataTree, finalPruner));
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
         final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+        final PruningDataTreeModification mod = createPruningModification(unwrapped,
+            NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
 
-        // TODO: we should be able to reuse the pruner, provided we are not reentrant
-        ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext);
-        if (NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0) {
-            pruner = pruner.withUintAdaption();
-        }
-
-        final PruningDataTreeModification mod = new PruningDataTreeModification(unwrapped, dataTree, pruner);
         DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
         mod.ready();
-
         LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
 
         try {
@@ -354,6 +358,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         allMetadataCommittedTransaction(entry.getKey());
     }
 
+    private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
+            final boolean uintAdapting) {
+        // TODO: we should be able to reuse the pruner, provided we are not reentrant
+        final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+            dataSchemaContext);
+        return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
+                : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
+    }
+
     /**
      * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
      * pruning in an attempt to adjust the state to our current SchemaContext.
@@ -362,7 +375,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @throws IOException when the snapshot fails to deserialize
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
+    final void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
         if (payload instanceof CommitTransactionPayload) {
             applyRecoveryCandidate((CommitTransactionPayload) payload);
         } else if (payload instanceof AbortTransactionPayload) {
@@ -375,6 +388,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
         }
@@ -382,7 +397,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private void applyReplicatedCandidate(final CommitTransactionPayload payload)
             throws DataValidationFailedException, IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
         final TransactionIdentifier identifier = entry.getKey();
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -409,7 +424,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @throws IOException when the snapshot fails to deserialize
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+    final void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
             DataValidationFailedException {
         /*
          * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
@@ -427,8 +442,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 applyReplicatedCandidate((CommitTransactionPayload) payload);
             } else {
                 verify(identifier instanceof TransactionIdentifier);
-                payloadReplicationComplete((TransactionIdentifier) identifier);
+                // if we did not track this transaction before, it means that it came from another leader and we are in
+                // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
+                // the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
+                if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
+                    applyReplicatedCandidate((CommitTransactionPayload) payload);
+                }
             }
+
+            // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
+            checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue()
+                    .getCandidate());
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -454,11 +478,35 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
             }
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((SkipTransactionsPayload)payload);
+            }
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
     }
 
+    private void checkRootOverwrite(final DataTreeCandidate candidate) {
+        final DatastoreContext datastoreContext = shard.getDatastoreContext();
+        if (!datastoreContext.isSnapshotOnRootOverwrite()) {
+            return;
+        }
+
+        if (!datastoreContext.isPersistent()) {
+            // FIXME: why don't we want a snapshot in non-persistent state?
+            return;
+        }
+
+        // top level container ie "/"
+        if (candidate.getRootPath().isEmpty()
+                && candidate.getRootNode().getModificationType() == ModificationType.WRITE) {
+            LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
+            shard.self().tell(new InitiateCaptureSnapshot(), noSender());
+        }
+    }
+
     private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) {
         if (callback != null) {
             replicationCallbacks.put(payload, callback);
@@ -476,22 +524,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private void payloadReplicationComplete(final TransactionIdentifier txId) {
+    private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
             allMetadataCommittedTransaction(txId);
-            return;
+            return false;
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
             LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
             allMetadataCommittedTransaction(txId);
-            return;
+            return false;
         }
 
         finishCommit(current.cohort);
+        return true;
     }
 
     private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
@@ -530,6 +579,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+        final var historyId = payload.getIdentifier();
+        final var txIds = payload.getTransactionIds();
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionsSkipped(historyId, txIds);
+        }
+    }
+
     /**
      * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
      * this method is used for re-establishing state when we are taking over
@@ -538,7 +595,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param closed True if the chain should be created in closed state (i.e. pending purge)
      * @return Transaction chain handle
      */
-    ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+    final ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
             final boolean closed) {
         final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
         final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
@@ -546,7 +603,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+    final ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
             final @Nullable Runnable callback) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
@@ -561,7 +618,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return chain;
     }
 
-    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+    final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
         shard.getShardMBean().incrementReadOnlyTransactionCount();
 
         if (txId.getHistoryId().getHistoryId() == 0) {
@@ -571,7 +628,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
     }
 
-    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+    final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
         shard.getShardMBean().incrementReadWriteTransactionCount();
 
         if (txId.getHistoryId().getHistoryId() == 0) {
@@ -583,7 +640,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @VisibleForTesting
-    public void notifyListeners(final DataTreeCandidate candidate) {
+    final void notifyListeners(final DataTreeCandidate candidate) {
         treeChangeListenerPublisher.publishChanges(candidate);
     }
 
@@ -591,7 +648,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
      * replication callbacks.
      */
-    void purgeLeaderState() {
+    final void purgeLeaderState() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
         }
@@ -606,7 +663,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param id History identifier
      * @param callback Callback to invoke upon completion, may be null
      */
-    void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+    final void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
         if (commonCloseTransactionChain(id, callback)) {
             replicatePayload(id, CloseLocalHistoryPayload.create(id,
                 shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
@@ -618,7 +675,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      *
      * @param id History identifier
      */
-    void closeTransactionChain(final LocalHistoryIdentifier id) {
+    final void closeTransactionChain(final LocalHistoryIdentifier id) {
         commonCloseTransactionChain(id, null);
     }
 
@@ -642,7 +699,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param id History identifier
      * @param callback Callback to invoke upon completion, may be null
      */
-    void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+    final void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
         final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
         if (chain == null) {
             LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
@@ -656,23 +713,38 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
-    Optional<DataTreeCandidate> readCurrentData() {
+    final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+            final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+        if (chain == null) {
+            LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
+        }
+
+        replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+            shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+    }
+
+    final Optional<DataTreeCandidate> readCurrentData() {
         return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
                 .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
     }
 
-    public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+    final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
             final Optional<DataTreeCandidate> initialState,
             final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
         treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
     }
 
-    int getQueueSize() {
+    final int getQueueSize() {
         return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size();
     }
 
     @Override
-    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+    final void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         final TransactionIdentifier id = transaction.getIdentifier();
         LOG.debug("{}: aborting transaction {}", logContext, id);
         replicatePayload(id, AbortTransactionPayload.create(
@@ -680,13 +752,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+    final void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
         // No-op for free-standing transactions
-
     }
 
     @Override
-    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+    final ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
             final Optional<SortedSet<String>> participatingShardNames) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         final TransactionIdentifier id = transaction.getIdentifier();
@@ -697,26 +768,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
     }
 
-    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+    final void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
         LOG.debug("{}: purging transaction {}", logContext, id);
         replicatePayload(id, PurgeTransactionPayload.create(
                 id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
-    public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+    @VisibleForTesting
+    public final Optional<NormalizedNode> readNode(final YangInstanceIdentifier path) {
         return dataTree.takeSnapshot().readNode(path);
     }
 
-    DataTreeSnapshot takeSnapshot() {
+    final DataTreeSnapshot takeSnapshot() {
         return dataTree.takeSnapshot();
     }
 
     @VisibleForTesting
-    public DataTreeModification newModification() {
+    final DataTreeModification newModification() {
         return dataTree.takeSnapshot().newModification();
     }
 
-    public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
+    final Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
         Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
 
         for (CommitEntry entry: pendingFinishCommits) {
@@ -741,7 +813,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     /**
      * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
      */
-    void resumeNextPendingTransaction() {
+    final void resumeNextPendingTransaction() {
         LOG.debug("{}: attempting to resume transaction processing", logContext);
         processNextPending();
     }
@@ -827,6 +899,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return first != null && first.cohort.getState() == State.COMMIT_PENDING;
     }
 
+    // non-final for mocking
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
         final CommitEntry head = pendingTransactions.peek();
         if (head == null) {
@@ -941,6 +1014,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingTransaction();
     }
 
+    // non-final for mocking
     @SuppressWarnings("checkstyle:IllegalCatch")
     void startPreCommit(final SimpleShardDataTreeCohort cohort) {
         final CommitEntry entry = pendingTransactions.peek();
@@ -961,9 +1035,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+        cohort.userPreCommit(candidate, new FutureCallback<>() {
             @Override
-            public void onSuccess(final Void noop) {
+            public void onSuccess(final Empty result) {
                 // Set the tip of the data tree.
                 tip = verifyNotNull(candidate);
 
@@ -1025,6 +1099,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         });
     }
 
+    // non-final for mocking
     void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
         final CommitEntry entry = pendingCommits.peek();
         checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
@@ -1079,16 +1154,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingCommit();
     }
 
-    Collection<ActorRef> getCohortActors() {
+    final Collection<ActorRef> getCohortActors() {
         return cohortRegistry.getCohortActors();
     }
 
-    void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+    final void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
 
     @Override
-    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+    final ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Exception failure) {
         final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
@@ -1096,7 +1171,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+    final ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Optional<SortedSet<String>> participatingShardNames) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
@@ -1107,7 +1182,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
     // the newReadWriteTransaction()
-    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+    final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Optional<SortedSet<String>> participatingShardNames) {
         if (txId.getHistoryId().getHistoryId() == 0) {
             return createReadyCohort(txId, mod, participatingShardNames);
@@ -1117,7 +1192,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
-    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+    final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
             final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         final long now = readTime();
@@ -1216,6 +1291,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    // non-final for mocking
     boolean startAbort(final SimpleShardDataTreeCohort cohort) {
         final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
                 pendingTransactions).iterator();
@@ -1244,7 +1320,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return false;
         }
 
-        DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
+        DataTreeTip newTip = requireNonNullElse(first.cohort.getCandidate(), dataTree);
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
@@ -1257,7 +1333,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
                 return true;
             } else {
-                newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip);
+                newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
             }
         }
 
@@ -1296,7 +1372,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    void setRunOnPendingTransactionsComplete(final Runnable operation) {
+    final void setRunOnPendingTransactionsComplete(final Runnable operation) {
         runOnPendingTransactionsComplete = operation;
         maybeRunOperationOnPendingTransactionsComplete();
     }
@@ -1311,16 +1387,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    ShardStats getStats() {
+    final ShardStats getStats() {
         return shard.getShardMBean();
     }
 
-    Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+    final Iterator<SimpleShardDataTreeCohort> cohortIterator() {
         return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
             e -> e.cohort).iterator();
     }
 
-    void removeTransactionChain(final LocalHistoryIdentifier id) {
+    final void removeTransactionChain(final LocalHistoryIdentifier id) {
         if (transactionChains.remove(id) != null) {
             LOG.debug("{}: Removed transaction chain {}", logContext, id);
         }