BUG-5280: update transaction statistics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 31198b98f2541c1e2cf1684d292e076d2edc90d3..be4aa42669ff77fb653f59082f664dc85deaf7eb 100644 (file)
@@ -36,15 +36,24 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
@@ -68,7 +77,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
@@ -102,10 +110,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
+
+    /**
+     * Callbacks that need to be invoked once a payload is replicated.
+     */
+    private final Map<Payload, Runnable> replicationCallbacks = new HashMap<>();
+
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final Collection<ShardDataTreeMetadata<?>> metadata;
@@ -124,7 +139,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private SchemaContext schemaContext;
 
-    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
+    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
             final ShardDataTreeMetadata<?>... metadata) {
@@ -139,19 +154,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         tip = dataTree;
     }
 
-    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
             final YangInstanceIdentifier root,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final ShardDataTreeMetadata<?>... metadata) {
         this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
-                treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
+                treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata);
     }
 
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(),
-                new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
     }
 
     final String logContext() {
@@ -312,6 +327,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     ((CommitTransactionPayload) payload).getCandidate();
             applyRecoveryCandidate(e.getValue());
             allMetadataCommittedTransaction(e.getKey());
+        } else if (payload instanceof AbortTransactionPayload) {
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+        } else if (payload instanceof PurgeTransactionPayload) {
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+        } else if (payload instanceof CreateLocalHistoryPayload) {
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof PurgeLocalHistoryPayload) {
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof DataTreeCandidatePayload) {
             applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
         } else {
@@ -367,11 +392,58 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 Verify.verify(identifier instanceof TransactionIdentifier);
                 payloadReplicationComplete((TransactionIdentifier) identifier);
             }
+        } else if (payload instanceof AbortTransactionPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((AbortTransactionPayload) payload);
+            } else {
+                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof PurgeTransactionPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((PurgeTransactionPayload) payload);
+            } else {
+                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+            } else {
+                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof CreateLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CreateLocalHistoryPayload)payload);
+            } else {
+                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof PurgeLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
+            } else {
+                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+            }
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
     }
 
+    private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) {
+        if (callback != null) {
+            replicationCallbacks.put(payload, callback);
+        }
+        shard.persistPayload(id, payload, true);
+    }
+
+    private void payloadReplicationComplete(final AbstractIdentifiablePayload<?> payload) {
+        final Runnable callback = replicationCallbacks.remove(payload);
+        if (callback != null) {
+            LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback);
+            callback.run();
+        } else {
+            LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier());
+        }
+    }
+
     private void payloadReplicationComplete(final TransactionIdentifier txId) {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
@@ -380,7 +452,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
-            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+            LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
             return;
         }
@@ -388,17 +460,65 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         finishCommit(current.cohort);
     }
 
+    private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionAborted(txId);
+        }
+    }
+
     private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
         for (ShardDataTreeMetadata<?> m : metadata) {
             m.onTransactionCommitted(txId);
         }
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
-        ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
+    private void allMetadataPurgedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionPurged(txId);
+        }
+    }
+
+    private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryCreated(historyId);
+        }
+    }
+
+    private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryClosed(historyId);
+        }
+    }
+
+    private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryPurged(historyId);
+        }
+    }
+
+    /**
+     * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
+     * this method is used for re-establishing state when we are taking over
+     *
+     * @param historyId Local history identifier
+     * @param closed True if the chain should be created in closed state (i.e. pending purge)
+     * @return Transaction chain handle
+     */
+    ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+            final boolean closed) {
+        final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
+        final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
+        Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId,
+                existing);
+        return ret;
+    }
+
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+        ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
-            chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
-            transactionChains.put(localHistoryIdentifier, chain);
+            chain = new ShardDataTreeTransactionChain(historyId, this);
+            transactionChains.put(historyId, chain);
+            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
         }
 
         return chain;
@@ -406,7 +526,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
         if (txId.getHistoryId().getHistoryId() == 0) {
-            return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
+            return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
         return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
@@ -446,21 +566,56 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    void closeAllTransactionChains() {
+    /**
+     * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
+     * replication callbacks.
+     */
+    void purgeLeaderState() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
         }
 
         transactionChains.clear();
+        replicationCallbacks.clear();
     }
 
-    void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
-        final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
-        if (chain != null) {
-            chain.close();
-        } else {
-            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
+    /**
+     * Close a single transaction chain.
+     *
+     * @param id History identifier
+     * @param callback Callback to invoke upon completion, may be null
+     */
+    void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+        if (chain == null) {
+            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
+        }
+
+        chain.close();
+        replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+    }
+
+    /**
+     * Purge a single transaction chain.
+     *
+     * @param id History identifier
+     * @param callback Callback to invoke upon completion, may be null
+     */
+    void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
+        if (chain == null) {
+            LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
         }
+
+        replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
     Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
@@ -493,8 +648,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
-        // Intentional no-op
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+        final TransactionIdentifier id = transaction.getIdentifier();
+        LOG.debug("{}: aborting transaction {}", logContext, id);
+        replicatePayload(id, AbortTransactionPayload.create(id), callback);
+    }
+
+    @Override
+    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+        // No-op for free-standing transactions
+
     }
 
     @Override
@@ -505,6 +668,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return createReadyCohort(transaction.getIdentifier(), snapshot);
     }
 
+    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+        LOG.debug("{}: purging transaction {}", logContext, id);
+        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+    }
+
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
         return dataTree.takeSnapshot().readNode(path);
     }
@@ -560,28 +728,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextPendingTransaction() {
-        while (!pendingTransactions.isEmpty()) {
-            final CommitEntry entry = pendingTransactions.peek();
+        processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
 
-            if (cohort.isFailed()) {
-                LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
-                pendingTransactions.remove();
-                continue;
-            }
-
-            if (cohort.getState() != State.CAN_COMMIT_PENDING) {
-                break;
-            }
-
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
+                cohort.throwCanCommitFailure();
+
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
-                entry.lastAccess = shard.ticker().read();
+                entry.lastAccess = ticker().read();
                 return;
             } catch (ConflictingModificationAppliedException e) {
                 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
@@ -603,24 +762,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
             // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
             pendingTransactions.poll().cohort.failedCanCommit(cause);
-        }
+        });
+    }
 
-        maybeRunOperationOnPendingTransactionsComplete();
+    private void processNextPending() {
+        processNextPendingCommit();
+        processNextPendingTransaction();
     }
 
-    private void processNextPendingCommit() {
-        while (!pendingCommits.isEmpty()) {
-            final CommitEntry entry = pendingCommits.peek();
+    private void processNextPending(final Queue<CommitEntry> queue, final State allowedState,
+            final Consumer<CommitEntry> processor) {
+        while (!queue.isEmpty()) {
+            final CommitEntry entry = queue.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
 
             if (cohort.isFailed()) {
                 LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
-                pendingCommits.remove();
+                queue.remove();
                 continue;
             }
 
-            if (cohort.getState() == State.COMMIT_PENDING) {
-                startCommit(cohort, cohort.getCandidate());
+            if (cohort.getState() == allowedState) {
+                processor.accept(entry);
             }
 
             break;
@@ -629,16 +792,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         maybeRunOperationOnPendingTransactionsComplete();
     }
 
+    private void processNextPendingCommit() {
+        processNextPending(pendingCommits, State.COMMIT_PENDING,
+            entry -> startCommit(entry.cohort, entry.cohort.getCandidate()));
+    }
+
     private boolean peekNextPendingCommit() {
         final CommitEntry first = pendingCommits.peek();
         return first != null && first.cohort.getState() == State.COMMIT_PENDING;
     }
 
-    private void processNextPending() {
-        processNextPendingCommit();
-        processNextPendingTransaction();
-    }
-
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
         final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
         if (!cohort.equals(current)) {
@@ -677,7 +840,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         // Set the tip of the data tree.
         tip = Verify.verifyNotNull(candidate);
 
-        entry.lastAccess = shard.ticker().read();
+        entry.lastAccess = ticker().read();
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
@@ -739,14 +902,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
 
-        if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
-            LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
-            pendingCommits.remove();
-            pendingFinishCommits.add(entry);
-            finishCommit(cohort);
-            return;
-        }
-
         final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
@@ -788,23 +943,35 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingCommit();
     }
 
+    Collection<ActorRef> getCohortActors() {
+        return cohortRegistry.getCohortActors();
+    }
+
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
 
+
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
-            final DataTreeModification modification) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final Exception failure) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+        return cohort;
+    }
+
+    @Override
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
-        pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
         return cohort;
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
-        final long now = shard.ticker().read();
+        final long now = ticker().read();
 
         final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
             !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
@@ -916,7 +1083,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void rebaseTransactions(Iterator<CommitEntry> iter, @Nonnull TipProducingDataTreeTip newTip) {
+    private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final TipProducingDataTreeTip newTip) {
         tip = Preconditions.checkNotNull(newTip);
         while (iter.hasNext()) {
             final SimpleShardDataTreeCohort cohort = iter.next().cohort;
@@ -961,4 +1128,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             runOnPendingTransactionsComplete = null;
         }
     }
+
+    ShardStats getStats() {
+        return shard.getShardMBean();
+    }
 }