From bfc3eb1e379514d0cca655bfa4e737b355dc89c8 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 30 Jun 2021 23:14:31 +0200 Subject: [PATCH] Lock down controller.cluster.datastore.Shard Since sal-distributed-eos is gone, we do not have any known subclasses, lock down Shard implementation as much as possible. This will aid us in refactoring later. The entire class is now considered an implementation detail, amenable to changes driven by RaftActor evolution. Change-Id: Ic54794b33766459f16a5ebdac6a3faa731c2b49d Signed-off-by: Robert Varga --- .../controller/cluster/datastore/Shard.java | 89 +++++++++---------- .../cluster/datastore/ShardTest.java | 6 +- 2 files changed, 46 insertions(+), 49 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index fb0ef03d18..030ef16dc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -26,7 +26,6 @@ import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; -import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -125,6 +124,7 @@ import scala.concurrent.duration.FiniteDuration; *

* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it */ +// FIXME: non-final for testing? public class Shard extends RaftActor { @VisibleForTesting @@ -216,7 +216,7 @@ public class Shard extends RaftActor { private final ActorRef exportActor; - protected Shard(final AbstractBuilder builder) { + Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -307,7 +307,7 @@ public class Shard extends RaftActor { } @Override - public void postStop() throws Exception { + public final void postStop() throws Exception { LOG.info("Stopping Shard {}", persistenceId()); super.postStop(); @@ -325,7 +325,7 @@ public class Shard extends RaftActor { } @Override - protected void handleRecover(final Object message) { + protected final void handleRecover(final Object message) { LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(), getSender()); @@ -355,6 +355,7 @@ public class Shard extends RaftActor { } @Override + // non-final for TestShard protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { final Optional maybeError = context.error(); @@ -662,31 +663,31 @@ public class Shard extends RaftActor { return getLeaderId() != null; } - public int getPendingTxCommitQueueSize() { + final int getPendingTxCommitQueueSize() { return store.getQueueSize(); } - public int getCohortCacheSize() { + final int getCohortCacheSize() { return commitCoordinator.getCohortCacheSize(); } @Override - protected Optional getRoleChangeNotifier() { + protected final Optional getRoleChangeNotifier() { return roleChangeNotifier; } - String getShardName() { + final String getShardName() { return shardName; } @Override - protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, + protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion) : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } - protected void onDatastoreContext(final DatastoreContext context) { + private void onDatastoreContext(final DatastoreContext context) { datastoreContext = verifyNotNull(context); setTransactionCommitTimeout(); @@ -697,8 +698,9 @@ public class Shard extends RaftActor { } // applyState() will be invoked once consensus is reached on the payload + // non-final for mocking void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { - boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); + final boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); if (canSkipPayload) { applyState(self(), id, payload); } else { @@ -743,7 +745,7 @@ public class Shard extends RaftActor { } @SuppressWarnings("checkstyle:IllegalCatch") - protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { + private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { askProtocolEncountered(batched.getTransactionId()); try { @@ -867,7 +869,7 @@ public class Shard extends RaftActor { doAbortTransaction(transactionId, getSender()); } - void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { + final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { commitCoordinator.handleAbort(transactionID, sender, this); } @@ -956,13 +958,12 @@ public class Shard extends RaftActor { } @Override - @VisibleForTesting - public RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + protected final RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return snapshotCohort; } @Override - protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + protected final RaftActorRecoveryCohort getRaftActorRecoveryCohort() { if (restoreFromSnapshot == null) { return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); } @@ -971,6 +972,7 @@ public class Shard extends RaftActor { } @Override + // non-final for testing protected void onRecoveryComplete() { restoreFromSnapshot = null; @@ -989,7 +991,7 @@ public class Shard extends RaftActor { } @Override - protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { + protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { if (data instanceof DisableTrackingPayload) { disableTracking((DisableTrackingPayload) data); @@ -1007,7 +1009,7 @@ public class Shard extends RaftActor { } @Override - protected void onStateChanged() { + protected final void onStateChanged() { boolean isLeader = isLeader(); boolean hasLeader = hasLeader(); treeChangeSupport.onLeadershipChange(isLeader, hasLeader); @@ -1030,7 +1032,7 @@ public class Shard extends RaftActor { } @Override - protected void onLeaderChanged(final String oldLeader, final String newLeader) { + protected final void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); paused = false; @@ -1051,7 +1053,9 @@ public class Shard extends RaftActor { // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); if (leader != null) { - Collection messagesToForward = convertPendingTransactionsToMessages(); + // Clears all pending transactions and converts them to messages to be forwarded to a new leader. + Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( + datastoreContext.getShardBatchedModificationCount()); if (!messagesToForward.isEmpty()) { LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(), @@ -1069,7 +1073,7 @@ public class Shard extends RaftActor { } } else { // We have become the leader, we need to reconstruct frontend state - knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this)); LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet()); } @@ -1078,18 +1082,8 @@ public class Shard extends RaftActor { } } - /** - * Clears all pending transactions and converts them to messages to be forwarded to a new leader. - * - * @return the converted messages - */ - public Collection convertPendingTransactionsToMessages() { - return commitCoordinator.convertPendingTransactionsToMessages( - datastoreContext.getShardBatchedModificationCount()); - } - @Override - protected void pauseLeader(final Runnable operation) { + protected final void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); paused = true; @@ -1103,29 +1097,30 @@ public class Shard extends RaftActor { } @Override - protected void unpauseLeader() { + protected final void unpauseLeader() { LOG.debug("{}: In unpauseLeader", persistenceId()); paused = false; store.setRunOnPendingTransactionsComplete(null); // Restore tell-based protocol state as if we were becoming the leader - knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this)); } @Override - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { - return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) - .commitCohortActors(store.getCohortActors()); + protected final OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandShardState.newBuilder() + .treeChangeListenerActors(treeChangeSupport.getListenerActors()) + .commitCohortActors(store.getCohortActors()); } @Override - public String persistenceId() { + public final String persistenceId() { return this.name; } @Override - public String journalPluginId() { + public final String journalPluginId() { // This method may be invoked from super constructor (wonderful), hence we also need to handle the case of // the field being uninitialized because our constructor is not finished. if (datastoreContext != null && !datastoreContext.isPersistent()) { @@ -1135,20 +1130,22 @@ public class Shard extends RaftActor { } @VisibleForTesting - ShardCommitCoordinator getCommitCoordinator() { + final ShardCommitCoordinator getCommitCoordinator() { return commitCoordinator; } - public DatastoreContext getDatastoreContext() { + // non-final for mocking + DatastoreContext getDatastoreContext() { return datastoreContext; } @VisibleForTesting - public ShardDataTree getDataStore() { + final ShardDataTree getDataStore() { return store; } @VisibleForTesting + // non-final for mocking ShardStats getShardMBean() { return shardMBean; } @@ -1168,12 +1165,12 @@ public class Shard extends RaftActor { private volatile boolean sealed; - protected AbstractBuilder(final Class shardClass) { + AbstractBuilder(final Class shardClass) { this.shardClass = shardClass; } - protected void checkSealed() { - checkState(!sealed, "Builder isalready sealed - further modifications are not allowed"); + final void checkSealed() { + checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); } @SuppressWarnings("unchecked") @@ -1230,7 +1227,7 @@ public class Shard extends RaftActor { } public EffectiveModelContext getSchemaContext() { - return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext()); + return verifyNotNull(schemaContextProvider.getEffectiveModelContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 34b984f5a7..b9376520a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -372,7 +372,7 @@ public class ShardTest extends AbstractShardTest { // Add some ModificationPayload entries for (int i = 1; i <= nListEntries; i++) { - listEntryKeys.add(Integer.valueOf(i)); + listEntryKeys.add(i); final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); @@ -1704,9 +1704,9 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.persistent(persistent); - class TestShard extends Shard { + final class TestShard extends Shard { - protected TestShard(final AbstractBuilder builder) { + TestShard(final AbstractBuilder builder) { super(builder); setPersistence(new TestPersistentDataProvider(super.persistence())); } -- 2.36.6