Lock down ShardDataTree 54/96754/3
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 30 Jun 2021 21:32:34 +0000 (23:32 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 1 Jul 2021 08:33:13 +0000 (10:33 +0200)
Now that our primary user is an implmentation detail, we can lock down a
few aspects of ShardDataTree operation. This will ease refactoring
further down the road.

Change-Id: I91248347889ea0ceecee9f093f54b5d8241fb39c
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java

index bd5d736..428cf84 100644 (file)
@@ -104,6 +104,8 @@ import scala.concurrent.duration.FiniteDuration;
  * <p>
  * This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe.
  */
+@VisibleForTesting
+// non-final for mocking
 public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final class CommitEntry {
         final SimpleShardDataTreeCohort cohort;
@@ -207,21 +209,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return shard.ticker().read();
     }
 
-    public DataTree getDataTree() {
+    final DataTree getDataTree() {
         return dataTree;
     }
 
-    SchemaContext getSchemaContext() {
+    @VisibleForTesting
+    final SchemaContext getSchemaContext() {
         return schemaContext;
     }
 
-    void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
+    final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
         dataTree.setEffectiveModelContext(newSchemaContext);
         this.schemaContext = newSchemaContext;
         this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
     }
 
-    void resetTransactionBatch() {
+    final void resetTransactionBatch() {
         currentTransactionBatch = 0;
     }
 
@@ -300,7 +303,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param snapshot Snapshot that needs to be applied
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+    final void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
         // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
         applySnapshot(snapshot, UnaryOperator.identity());
     }
@@ -312,7 +315,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param snapshot Snapshot that needs to be applied
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
+    final void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
         // TODO: we should be able to reuse the pruner, provided we are not reentrant
         final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
             dataSchemaContext);
@@ -369,7 +372,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @throws IOException when the snapshot fails to deserialize
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
+    final void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
         if (payload instanceof CommitTransactionPayload) {
             applyRecoveryCandidate((CommitTransactionPayload) payload);
         } else if (payload instanceof AbortTransactionPayload) {
@@ -416,7 +419,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @throws IOException when the snapshot fails to deserialize
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+    final void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
             DataValidationFailedException {
         /*
          * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
@@ -574,7 +577,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param closed True if the chain should be created in closed state (i.e. pending purge)
      * @return Transaction chain handle
      */
-    ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+    final ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
             final boolean closed) {
         final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
         final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
@@ -582,7 +585,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+    final ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
             final @Nullable Runnable callback) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
@@ -597,7 +600,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return chain;
     }
 
-    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+    final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
         shard.getShardMBean().incrementReadOnlyTransactionCount();
 
         if (txId.getHistoryId().getHistoryId() == 0) {
@@ -607,7 +610,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
     }
 
-    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+    final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
         shard.getShardMBean().incrementReadWriteTransactionCount();
 
         if (txId.getHistoryId().getHistoryId() == 0) {
@@ -619,7 +622,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @VisibleForTesting
-    public void notifyListeners(final DataTreeCandidate candidate) {
+    final void notifyListeners(final DataTreeCandidate candidate) {
         treeChangeListenerPublisher.publishChanges(candidate);
     }
 
@@ -627,7 +630,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
      * replication callbacks.
      */
-    void purgeLeaderState() {
+    final void purgeLeaderState() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
         }
@@ -642,7 +645,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param id History identifier
      * @param callback Callback to invoke upon completion, may be null
      */
-    void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+    final void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
         if (commonCloseTransactionChain(id, callback)) {
             replicatePayload(id, CloseLocalHistoryPayload.create(id,
                 shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
@@ -654,7 +657,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      *
      * @param id History identifier
      */
-    void closeTransactionChain(final LocalHistoryIdentifier id) {
+    final void closeTransactionChain(final LocalHistoryIdentifier id) {
         commonCloseTransactionChain(id, null);
     }
 
@@ -678,7 +681,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param id History identifier
      * @param callback Callback to invoke upon completion, may be null
      */
-    void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+    final void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
         final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
         if (chain == null) {
             LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
@@ -692,23 +695,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
-    Optional<DataTreeCandidate> readCurrentData() {
+    final Optional<DataTreeCandidate> readCurrentData() {
         return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
                 .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
     }
 
-    public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+    final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
             final Optional<DataTreeCandidate> initialState,
             final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
         treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
     }
 
-    int getQueueSize() {
+    final int getQueueSize() {
         return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size();
     }
 
     @Override
-    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+    final void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         final TransactionIdentifier id = transaction.getIdentifier();
         LOG.debug("{}: aborting transaction {}", logContext, id);
         replicatePayload(id, AbortTransactionPayload.create(
@@ -716,13 +719,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+    final void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
         // No-op for free-standing transactions
-
     }
 
     @Override
-    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+    final ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
             final Optional<SortedSet<String>> participatingShardNames) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         final TransactionIdentifier id = transaction.getIdentifier();
@@ -733,26 +735,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
     }
 
-    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+    final void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
         LOG.debug("{}: purging transaction {}", logContext, id);
         replicatePayload(id, PurgeTransactionPayload.create(
                 id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
-    public Optional<NormalizedNode> readNode(final YangInstanceIdentifier path) {
+    @VisibleForTesting
+    public final Optional<NormalizedNode> readNode(final YangInstanceIdentifier path) {
         return dataTree.takeSnapshot().readNode(path);
     }
 
-    DataTreeSnapshot takeSnapshot() {
+    final DataTreeSnapshot takeSnapshot() {
         return dataTree.takeSnapshot();
     }
 
     @VisibleForTesting
-    public DataTreeModification newModification() {
+    final DataTreeModification newModification() {
         return dataTree.takeSnapshot().newModification();
     }
 
-    public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
+    final Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
         Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
 
         for (CommitEntry entry: pendingFinishCommits) {
@@ -777,7 +780,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     /**
      * Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
      */
-    void resumeNextPendingTransaction() {
+    final void resumeNextPendingTransaction() {
         LOG.debug("{}: attempting to resume transaction processing", logContext);
         processNextPending();
     }
@@ -863,6 +866,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return first != null && first.cohort.getState() == State.COMMIT_PENDING;
     }
 
+    // non-final for mocking
     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
         final CommitEntry head = pendingTransactions.peek();
         if (head == null) {
@@ -977,6 +981,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingTransaction();
     }
 
+    // non-final for mocking
     @SuppressWarnings("checkstyle:IllegalCatch")
     void startPreCommit(final SimpleShardDataTreeCohort cohort) {
         final CommitEntry entry = pendingTransactions.peek();
@@ -1061,6 +1066,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         });
     }
 
+    // non-final for mocking
     void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
         final CommitEntry entry = pendingCommits.peek();
         checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
@@ -1115,16 +1121,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingCommit();
     }
 
-    Collection<ActorRef> getCohortActors() {
+    final Collection<ActorRef> getCohortActors() {
         return cohortRegistry.getCohortActors();
     }
 
-    void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+    final void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
 
     @Override
-    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+    final ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Exception failure) {
         final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
@@ -1132,7 +1138,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+    final ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Optional<SortedSet<String>> participatingShardNames) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
@@ -1143,7 +1149,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
     // the newReadWriteTransaction()
-    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+    final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Optional<SortedSet<String>> participatingShardNames) {
         if (txId.getHistoryId().getHistoryId() == 0) {
             return createReadyCohort(txId, mod, participatingShardNames);
@@ -1153,7 +1159,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
-    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+    final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
             final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         final long now = readTime();
@@ -1252,6 +1258,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    // non-final for mocking
     boolean startAbort(final SimpleShardDataTreeCohort cohort) {
         final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
                 pendingTransactions).iterator();
@@ -1332,7 +1339,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    void setRunOnPendingTransactionsComplete(final Runnable operation) {
+    final void setRunOnPendingTransactionsComplete(final Runnable operation) {
         runOnPendingTransactionsComplete = operation;
         maybeRunOperationOnPendingTransactionsComplete();
     }
@@ -1347,16 +1354,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    ShardStats getStats() {
+    final ShardStats getStats() {
         return shard.getShardMBean();
     }
 
-    Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+    final Iterator<SimpleShardDataTreeCohort> cohortIterator() {
         return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
             e -> e.cohort).iterator();
     }
 
-    void removeTransactionChain(final LocalHistoryIdentifier id) {
+    final void removeTransactionChain(final LocalHistoryIdentifier id) {
         if (transactionChains.remove(id) != null) {
             LOG.debug("{}: Removed transaction chain {}", logContext, id);
         }