Lock down controller.cluster.datastore.Shard 53/96753/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 30 Jun 2021 21:14:31 +0000 (23:14 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 30 Jun 2021 21:16:14 +0000 (23:16 +0200)
Since sal-distributed-eos is gone, we do not have any known subclasses,
lock down Shard implementation as much as possible. This will aid us in
refactoring later. The entire class is now considered an implementation
detail, amenable to changes driven by RaftActor evolution.

Change-Id: Ic54794b33766459f16a5ebdac6a3faa731c2b49d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index fb0ef03..030ef16 100644 (file)
@@ -26,7 +26,6 @@ import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Ticker;
-import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -125,6 +124,7 @@ import scala.concurrent.duration.FiniteDuration;
  * <p>
  * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
  */
+// FIXME: non-final for testing?
 public class Shard extends RaftActor {
 
     @VisibleForTesting
@@ -216,7 +216,7 @@ public class Shard extends RaftActor {
 
     private final ActorRef exportActor;
 
-    protected Shard(final AbstractBuilder<?, ?> builder) {
+    Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
@@ -307,7 +307,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void postStop() throws Exception {
+    public final void postStop() throws Exception {
         LOG.info("Stopping Shard {}", persistenceId());
 
         super.postStop();
@@ -325,7 +325,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void handleRecover(final Object message) {
+    protected final void handleRecover(final Object message) {
         LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
             getSender());
 
@@ -355,6 +355,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
+    // non-final for TestShard
     protected void handleNonRaftCommand(final Object message) {
         try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
             final Optional<Error> maybeError = context.error();
@@ -662,31 +663,31 @@ public class Shard extends RaftActor {
         return getLeaderId() != null;
     }
 
-    public int getPendingTxCommitQueueSize() {
+    final int getPendingTxCommitQueueSize() {
         return store.getQueueSize();
     }
 
-    public int getCohortCacheSize() {
+    final int getCohortCacheSize() {
         return commitCoordinator.getCohortCacheSize();
     }
 
     @Override
-    protected Optional<ActorRef> getRoleChangeNotifier() {
+    protected final Optional<ActorRef> getRoleChangeNotifier() {
         return roleChangeNotifier;
     }
 
-    String getShardName() {
+    final String getShardName() {
         return shardName;
     }
 
     @Override
-    protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+    protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
             final short leaderPayloadVersion) {
         return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
                 : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
 
-    protected void onDatastoreContext(final DatastoreContext context) {
+    private void onDatastoreContext(final DatastoreContext context) {
         datastoreContext = verifyNotNull(context);
 
         setTransactionCommitTimeout();
@@ -697,8 +698,9 @@ public class Shard extends RaftActor {
     }
 
     // applyState() will be invoked once consensus is reached on the payload
+    // non-final for mocking
     void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
-        boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+        final boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
         if (canSkipPayload) {
             applyState(self(), id, payload);
         } else {
@@ -743,7 +745,7 @@ public class Shard extends RaftActor {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+    private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
         askProtocolEncountered(batched.getTransactionId());
 
         try {
@@ -867,7 +869,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(transactionId, getSender());
     }
 
-    void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
+    final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
         commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
@@ -956,13 +958,12 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    @VisibleForTesting
-    public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+    protected final RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
         return snapshotCohort;
     }
 
     @Override
-    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+    protected final RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
         if (restoreFromSnapshot == null) {
             return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
         }
@@ -971,6 +972,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
+    // non-final for testing
     protected void onRecoveryComplete() {
         restoreFromSnapshot = null;
 
@@ -989,7 +991,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
+    protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof Payload) {
             if (data instanceof DisableTrackingPayload) {
                 disableTracking((DisableTrackingPayload) data);
@@ -1007,7 +1009,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void onStateChanged() {
+    protected final void onStateChanged() {
         boolean isLeader = isLeader();
         boolean hasLeader = hasLeader();
         treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
@@ -1030,7 +1032,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+    protected final void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
         paused = false;
 
@@ -1051,7 +1053,9 @@ public class Shard extends RaftActor {
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
             if (leader != null) {
-                Collection<?> messagesToForward = convertPendingTransactionsToMessages();
+                // Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+                Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+                    datastoreContext.getShardBatchedModificationCount());
 
                 if (!messagesToForward.isEmpty()) {
                     LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
@@ -1069,7 +1073,7 @@ public class Shard extends RaftActor {
             }
         } else {
             // We have become the leader, we need to reconstruct frontend state
-            knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+            knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
             LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
         }
 
@@ -1078,18 +1082,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    /**
-     * Clears all pending transactions and converts them to messages to be forwarded to a new leader.
-     *
-     * @return the converted messages
-     */
-    public Collection<?> convertPendingTransactionsToMessages() {
-        return commitCoordinator.convertPendingTransactionsToMessages(
-                datastoreContext.getShardBatchedModificationCount());
-    }
-
     @Override
-    protected void pauseLeader(final Runnable operation) {
+    protected final void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
         paused = true;
 
@@ -1103,29 +1097,30 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void unpauseLeader() {
+    protected final void unpauseLeader() {
         LOG.debug("{}: In unpauseLeader", persistenceId());
         paused = false;
 
         store.setRunOnPendingTransactionsComplete(null);
 
         // Restore tell-based protocol state as if we were becoming the leader
-        knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+        knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
     }
 
     @Override
-    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
-        return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
-                .commitCohortActors(store.getCohortActors());
+    protected final OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
+        return OnDemandShardState.newBuilder()
+            .treeChangeListenerActors(treeChangeSupport.getListenerActors())
+            .commitCohortActors(store.getCohortActors());
     }
 
     @Override
-    public String persistenceId() {
+    public final String persistenceId() {
         return this.name;
     }
 
     @Override
-    public String journalPluginId() {
+    public final String journalPluginId() {
         // This method may be invoked from super constructor (wonderful), hence we also need to handle the case of
         // the field being uninitialized because our constructor is not finished.
         if (datastoreContext != null && !datastoreContext.isPersistent()) {
@@ -1135,20 +1130,22 @@ public class Shard extends RaftActor {
     }
 
     @VisibleForTesting
-    ShardCommitCoordinator getCommitCoordinator() {
+    final ShardCommitCoordinator getCommitCoordinator() {
         return commitCoordinator;
     }
 
-    public DatastoreContext getDatastoreContext() {
+    // non-final for mocking
+    DatastoreContext getDatastoreContext() {
         return datastoreContext;
     }
 
     @VisibleForTesting
-    public ShardDataTree getDataStore() {
+    final ShardDataTree getDataStore() {
         return store;
     }
 
     @VisibleForTesting
+    // non-final for mocking
     ShardStats getShardMBean() {
         return shardMBean;
     }
@@ -1168,12 +1165,12 @@ public class Shard extends RaftActor {
 
         private volatile boolean sealed;
 
-        protected AbstractBuilder(final Class<? extends S> shardClass) {
+        AbstractBuilder(final Class<? extends S> shardClass) {
             this.shardClass = shardClass;
         }
 
-        protected void checkSealed() {
-            checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
+        final void checkSealed() {
+            checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
         }
 
         @SuppressWarnings("unchecked")
@@ -1230,7 +1227,7 @@ public class Shard extends RaftActor {
         }
 
         public EffectiveModelContext getSchemaContext() {
-            return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext());
+            return verifyNotNull(schemaContextProvider.getEffectiveModelContext());
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
index 34b984f..b937652 100644 (file)
@@ -372,7 +372,7 @@ public class ShardTest extends AbstractShardTest {
 
         // Add some ModificationPayload entries
         for (int i = 1; i <= nListEntries; i++) {
-            listEntryKeys.add(Integer.valueOf(i));
+            listEntryKeys.add(i);
 
             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
@@ -1704,9 +1704,9 @@ public class ShardTest extends AbstractShardTest {
 
         dataStoreContextBuilder.persistent(persistent);
 
-        class TestShard extends Shard {
+        final class TestShard extends Shard {
 
-            protected TestShard(final AbstractBuilder<?, ?> builder) {
+            TestShard(final AbstractBuilder<?, ?> builder) {
                 super(builder);
                 setPersistence(new TestPersistentDataProvider(super.persistence()));
             }