Lock down controller.cluster.datastore.Shard 53/96753/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 30 Jun 2021 21:14:31 +0000 (23:14 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 30 Jun 2021 21:16:14 +0000 (23:16 +0200)
Since sal-distributed-eos is gone, we do not have any known subclasses,
lock down Shard implementation as much as possible. This will aid us in
refactoring later. The entire class is now considered an implementation
detail, amenable to changes driven by RaftActor evolution.

Change-Id: Ic54794b33766459f16a5ebdac6a3faa731c2b49d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index fb0ef03d1844c8e48b1c02fd77b880179306fbb0..030ef16dc605c18a5b64f20bab535181f91d8834 100644 (file)
@@ -26,7 +26,6 @@ import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Ticker;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Ticker;
-import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -125,6 +124,7 @@ import scala.concurrent.duration.FiniteDuration;
  * <p>
  * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
  */
  * <p>
  * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
  */
+// FIXME: non-final for testing?
 public class Shard extends RaftActor {
 
     @VisibleForTesting
 public class Shard extends RaftActor {
 
     @VisibleForTesting
@@ -216,7 +216,7 @@ public class Shard extends RaftActor {
 
     private final ActorRef exportActor;
 
 
     private final ActorRef exportActor;
 
-    protected Shard(final AbstractBuilder<?, ?> builder) {
+    Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
@@ -307,7 +307,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
-    public void postStop() throws Exception {
+    public final void postStop() throws Exception {
         LOG.info("Stopping Shard {}", persistenceId());
 
         super.postStop();
         LOG.info("Stopping Shard {}", persistenceId());
 
         super.postStop();
@@ -325,7 +325,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
-    protected void handleRecover(final Object message) {
+    protected final void handleRecover(final Object message) {
         LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
             getSender());
 
         LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
             getSender());
 
@@ -355,6 +355,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
+    // non-final for TestShard
     protected void handleNonRaftCommand(final Object message) {
         try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
             final Optional<Error> maybeError = context.error();
     protected void handleNonRaftCommand(final Object message) {
         try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
             final Optional<Error> maybeError = context.error();
@@ -662,31 +663,31 @@ public class Shard extends RaftActor {
         return getLeaderId() != null;
     }
 
         return getLeaderId() != null;
     }
 
-    public int getPendingTxCommitQueueSize() {
+    final int getPendingTxCommitQueueSize() {
         return store.getQueueSize();
     }
 
         return store.getQueueSize();
     }
 
-    public int getCohortCacheSize() {
+    final int getCohortCacheSize() {
         return commitCoordinator.getCohortCacheSize();
     }
 
     @Override
         return commitCoordinator.getCohortCacheSize();
     }
 
     @Override
-    protected Optional<ActorRef> getRoleChangeNotifier() {
+    protected final Optional<ActorRef> getRoleChangeNotifier() {
         return roleChangeNotifier;
     }
 
         return roleChangeNotifier;
     }
 
-    String getShardName() {
+    final String getShardName() {
         return shardName;
     }
 
     @Override
         return shardName;
     }
 
     @Override
-    protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+    protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
             final short leaderPayloadVersion) {
         return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
                 : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
 
             final short leaderPayloadVersion) {
         return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
                 : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
 
-    protected void onDatastoreContext(final DatastoreContext context) {
+    private void onDatastoreContext(final DatastoreContext context) {
         datastoreContext = verifyNotNull(context);
 
         setTransactionCommitTimeout();
         datastoreContext = verifyNotNull(context);
 
         setTransactionCommitTimeout();
@@ -697,8 +698,9 @@ public class Shard extends RaftActor {
     }
 
     // applyState() will be invoked once consensus is reached on the payload
     }
 
     // applyState() will be invoked once consensus is reached on the payload
+    // non-final for mocking
     void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
     void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
-        boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+        final boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
         if (canSkipPayload) {
             applyState(self(), id, payload);
         } else {
         if (canSkipPayload) {
             applyState(self(), id, payload);
         } else {
@@ -743,7 +745,7 @@ public class Shard extends RaftActor {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+    private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
         askProtocolEncountered(batched.getTransactionId());
 
         try {
         askProtocolEncountered(batched.getTransactionId());
 
         try {
@@ -867,7 +869,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(transactionId, getSender());
     }
 
         doAbortTransaction(transactionId, getSender());
     }
 
-    void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
+    final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
         commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
         commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
@@ -956,13 +958,12 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
-    @VisibleForTesting
-    public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+    protected final RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
         return snapshotCohort;
     }
 
     @Override
         return snapshotCohort;
     }
 
     @Override
-    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+    protected final RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
         if (restoreFromSnapshot == null) {
             return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
         }
         if (restoreFromSnapshot == null) {
             return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
         }
@@ -971,6 +972,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
+    // non-final for testing
     protected void onRecoveryComplete() {
         restoreFromSnapshot = null;
 
     protected void onRecoveryComplete() {
         restoreFromSnapshot = null;
 
@@ -989,7 +991,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
-    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
+    protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof Payload) {
             if (data instanceof DisableTrackingPayload) {
                 disableTracking((DisableTrackingPayload) data);
         if (data instanceof Payload) {
             if (data instanceof DisableTrackingPayload) {
                 disableTracking((DisableTrackingPayload) data);
@@ -1007,7 +1009,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
-    protected void onStateChanged() {
+    protected final void onStateChanged() {
         boolean isLeader = isLeader();
         boolean hasLeader = hasLeader();
         treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
         boolean isLeader = isLeader();
         boolean hasLeader = hasLeader();
         treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
@@ -1030,7 +1032,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
-    protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+    protected final void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
         paused = false;
 
         shardMBean.incrementLeadershipChangeCount();
         paused = false;
 
@@ -1051,7 +1053,9 @@ public class Shard extends RaftActor {
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
             if (leader != null) {
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
             if (leader != null) {
-                Collection<?> messagesToForward = convertPendingTransactionsToMessages();
+                // Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+                Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+                    datastoreContext.getShardBatchedModificationCount());
 
                 if (!messagesToForward.isEmpty()) {
                     LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
 
                 if (!messagesToForward.isEmpty()) {
                     LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
@@ -1069,7 +1073,7 @@ public class Shard extends RaftActor {
             }
         } else {
             // We have become the leader, we need to reconstruct frontend state
             }
         } else {
             // We have become the leader, we need to reconstruct frontend state
-            knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+            knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
             LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
         }
 
             LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
         }
 
@@ -1078,18 +1082,8 @@ public class Shard extends RaftActor {
         }
     }
 
         }
     }
 
-    /**
-     * Clears all pending transactions and converts them to messages to be forwarded to a new leader.
-     *
-     * @return the converted messages
-     */
-    public Collection<?> convertPendingTransactionsToMessages() {
-        return commitCoordinator.convertPendingTransactionsToMessages(
-                datastoreContext.getShardBatchedModificationCount());
-    }
-
     @Override
     @Override
-    protected void pauseLeader(final Runnable operation) {
+    protected final void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
         paused = true;
 
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
         paused = true;
 
@@ -1103,29 +1097,30 @@ public class Shard extends RaftActor {
     }
 
     @Override
     }
 
     @Override
-    protected void unpauseLeader() {
+    protected final void unpauseLeader() {
         LOG.debug("{}: In unpauseLeader", persistenceId());
         paused = false;
 
         store.setRunOnPendingTransactionsComplete(null);
 
         // Restore tell-based protocol state as if we were becoming the leader
         LOG.debug("{}: In unpauseLeader", persistenceId());
         paused = false;
 
         store.setRunOnPendingTransactionsComplete(null);
 
         // Restore tell-based protocol state as if we were becoming the leader
-        knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+        knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
     }
 
     @Override
     }
 
     @Override
-    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
-        return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
-                .commitCohortActors(store.getCohortActors());
+    protected final OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
+        return OnDemandShardState.newBuilder()
+            .treeChangeListenerActors(treeChangeSupport.getListenerActors())
+            .commitCohortActors(store.getCohortActors());
     }
 
     @Override
     }
 
     @Override
-    public String persistenceId() {
+    public final String persistenceId() {
         return this.name;
     }
 
     @Override
         return this.name;
     }
 
     @Override
-    public String journalPluginId() {
+    public final String journalPluginId() {
         // This method may be invoked from super constructor (wonderful), hence we also need to handle the case of
         // the field being uninitialized because our constructor is not finished.
         if (datastoreContext != null && !datastoreContext.isPersistent()) {
         // This method may be invoked from super constructor (wonderful), hence we also need to handle the case of
         // the field being uninitialized because our constructor is not finished.
         if (datastoreContext != null && !datastoreContext.isPersistent()) {
@@ -1135,20 +1130,22 @@ public class Shard extends RaftActor {
     }
 
     @VisibleForTesting
     }
 
     @VisibleForTesting
-    ShardCommitCoordinator getCommitCoordinator() {
+    final ShardCommitCoordinator getCommitCoordinator() {
         return commitCoordinator;
     }
 
         return commitCoordinator;
     }
 
-    public DatastoreContext getDatastoreContext() {
+    // non-final for mocking
+    DatastoreContext getDatastoreContext() {
         return datastoreContext;
     }
 
     @VisibleForTesting
         return datastoreContext;
     }
 
     @VisibleForTesting
-    public ShardDataTree getDataStore() {
+    final ShardDataTree getDataStore() {
         return store;
     }
 
     @VisibleForTesting
         return store;
     }
 
     @VisibleForTesting
+    // non-final for mocking
     ShardStats getShardMBean() {
         return shardMBean;
     }
     ShardStats getShardMBean() {
         return shardMBean;
     }
@@ -1168,12 +1165,12 @@ public class Shard extends RaftActor {
 
         private volatile boolean sealed;
 
 
         private volatile boolean sealed;
 
-        protected AbstractBuilder(final Class<? extends S> shardClass) {
+        AbstractBuilder(final Class<? extends S> shardClass) {
             this.shardClass = shardClass;
         }
 
             this.shardClass = shardClass;
         }
 
-        protected void checkSealed() {
-            checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
+        final void checkSealed() {
+            checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
         }
 
         @SuppressWarnings("unchecked")
         }
 
         @SuppressWarnings("unchecked")
@@ -1230,7 +1227,7 @@ public class Shard extends RaftActor {
         }
 
         public EffectiveModelContext getSchemaContext() {
         }
 
         public EffectiveModelContext getSchemaContext() {
-            return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext());
+            return verifyNotNull(schemaContextProvider.getEffectiveModelContext());
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
index 34b984f5a748e2fa2a529d0a64f89c2a06633108..b9376520a5134814e821872843fe3b11d4b97f8d 100644 (file)
@@ -372,7 +372,7 @@ public class ShardTest extends AbstractShardTest {
 
         // Add some ModificationPayload entries
         for (int i = 1; i <= nListEntries; i++) {
 
         // Add some ModificationPayload entries
         for (int i = 1; i <= nListEntries; i++) {
-            listEntryKeys.add(Integer.valueOf(i));
+            listEntryKeys.add(i);
 
             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
 
             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
@@ -1704,9 +1704,9 @@ public class ShardTest extends AbstractShardTest {
 
         dataStoreContextBuilder.persistent(persistent);
 
 
         dataStoreContextBuilder.persistent(persistent);
 
-        class TestShard extends Shard {
+        final class TestShard extends Shard {
 
 
-            protected TestShard(final AbstractBuilder<?, ?> builder) {
+            TestShard(final AbstractBuilder<?, ?> builder) {
                 super(builder);
                 setPersistence(new TestPersistentDataProvider(super.persistence()));
             }
                 super(builder);
                 setPersistence(new TestPersistentDataProvider(super.persistence()));
             }