Since sal-distributed-eos is gone, we do not have any known subclasses,
lock down Shard implementation as much as possible. This will aid us in
refactoring later. The entire class is now considered an implementation
detail, amenable to changes driven by RaftActor evolution.
Change-Id: Ic54794b33766459f16a5ebdac6a3faa731c2b49d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
-import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
* <p>
* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
*/
* <p>
* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
*/
+// FIXME: non-final for testing?
public class Shard extends RaftActor {
@VisibleForTesting
public class Shard extends RaftActor {
@VisibleForTesting
private final ActorRef exportActor;
private final ActorRef exportActor;
- protected Shard(final AbstractBuilder<?, ?> builder) {
+ Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
- public void postStop() throws Exception {
+ public final void postStop() throws Exception {
LOG.info("Stopping Shard {}", persistenceId());
super.postStop();
LOG.info("Stopping Shard {}", persistenceId());
super.postStop();
- protected void handleRecover(final Object message) {
+ protected final void handleRecover(final Object message) {
LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
getSender());
LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
getSender());
+ // non-final for TestShard
protected void handleNonRaftCommand(final Object message) {
try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
protected void handleNonRaftCommand(final Object message) {
try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
return getLeaderId() != null;
}
return getLeaderId() != null;
}
- public int getPendingTxCommitQueueSize() {
+ final int getPendingTxCommitQueueSize() {
return store.getQueueSize();
}
return store.getQueueSize();
}
- public int getCohortCacheSize() {
+ final int getCohortCacheSize() {
return commitCoordinator.getCohortCacheSize();
}
@Override
return commitCoordinator.getCohortCacheSize();
}
@Override
- protected Optional<ActorRef> getRoleChangeNotifier() {
+ protected final Optional<ActorRef> getRoleChangeNotifier() {
return roleChangeNotifier;
}
return roleChangeNotifier;
}
- String getShardName() {
+ final String getShardName() {
return shardName;
}
@Override
return shardName;
}
@Override
- protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
final short leaderPayloadVersion) {
return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
: new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
final short leaderPayloadVersion) {
return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
: new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
- protected void onDatastoreContext(final DatastoreContext context) {
+ private void onDatastoreContext(final DatastoreContext context) {
datastoreContext = verifyNotNull(context);
setTransactionCommitTimeout();
datastoreContext = verifyNotNull(context);
setTransactionCommitTimeout();
}
// applyState() will be invoked once consensus is reached on the payload
}
// applyState() will be invoked once consensus is reached on the payload
+ // non-final for mocking
void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
- boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+ final boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
if (canSkipPayload) {
applyState(self(), id, payload);
} else {
if (canSkipPayload) {
applyState(self(), id, payload);
} else {
}
@SuppressWarnings("checkstyle:IllegalCatch")
}
@SuppressWarnings("checkstyle:IllegalCatch")
- protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+ private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
askProtocolEncountered(batched.getTransactionId());
try {
askProtocolEncountered(batched.getTransactionId());
try {
doAbortTransaction(transactionId, getSender());
}
doAbortTransaction(transactionId, getSender());
}
- void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
+ final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
commitCoordinator.handleAbort(transactionID, sender, this);
}
commitCoordinator.handleAbort(transactionID, sender, this);
}
- @VisibleForTesting
- public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ protected final RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
return snapshotCohort;
}
@Override
return snapshotCohort;
}
@Override
- protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ protected final RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
if (restoreFromSnapshot == null) {
return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
}
if (restoreFromSnapshot == null) {
return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
}
+ // non-final for testing
protected void onRecoveryComplete() {
restoreFromSnapshot = null;
protected void onRecoveryComplete() {
restoreFromSnapshot = null;
- protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
+ protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if (data instanceof Payload) {
if (data instanceof DisableTrackingPayload) {
disableTracking((DisableTrackingPayload) data);
if (data instanceof Payload) {
if (data instanceof DisableTrackingPayload) {
disableTracking((DisableTrackingPayload) data);
- protected void onStateChanged() {
+ protected final void onStateChanged() {
boolean isLeader = isLeader();
boolean hasLeader = hasLeader();
treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
boolean isLeader = isLeader();
boolean hasLeader = hasLeader();
treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
- protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+ protected final void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
paused = false;
shardMBean.incrementLeadershipChangeCount();
paused = false;
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
if (leader != null) {
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
if (leader != null) {
- Collection<?> messagesToForward = convertPendingTransactionsToMessages();
+ // Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+ Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
if (!messagesToForward.isEmpty()) {
LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
if (!messagesToForward.isEmpty()) {
LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
}
} else {
// We have become the leader, we need to reconstruct frontend state
}
} else {
// We have become the leader, we need to reconstruct frontend state
- knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
}
LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
}
- /**
- * Clears all pending transactions and converts them to messages to be forwarded to a new leader.
- *
- * @return the converted messages
- */
- public Collection<?> convertPendingTransactionsToMessages() {
- return commitCoordinator.convertPendingTransactionsToMessages(
- datastoreContext.getShardBatchedModificationCount());
- }
-
- protected void pauseLeader(final Runnable operation) {
+ protected final void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
paused = true;
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
paused = true;
- protected void unpauseLeader() {
+ protected final void unpauseLeader() {
LOG.debug("{}: In unpauseLeader", persistenceId());
paused = false;
store.setRunOnPendingTransactionsComplete(null);
// Restore tell-based protocol state as if we were becoming the leader
LOG.debug("{}: In unpauseLeader", persistenceId());
paused = false;
store.setRunOnPendingTransactionsComplete(null);
// Restore tell-based protocol state as if we were becoming the leader
- knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
- protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
- return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
- .commitCohortActors(store.getCohortActors());
+ protected final OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
+ return OnDemandShardState.newBuilder()
+ .treeChangeListenerActors(treeChangeSupport.getListenerActors())
+ .commitCohortActors(store.getCohortActors());
- public String persistenceId() {
+ public final String persistenceId() {
return this.name;
}
@Override
return this.name;
}
@Override
- public String journalPluginId() {
+ public final String journalPluginId() {
// This method may be invoked from super constructor (wonderful), hence we also need to handle the case of
// the field being uninitialized because our constructor is not finished.
if (datastoreContext != null && !datastoreContext.isPersistent()) {
// This method may be invoked from super constructor (wonderful), hence we also need to handle the case of
// the field being uninitialized because our constructor is not finished.
if (datastoreContext != null && !datastoreContext.isPersistent()) {
- ShardCommitCoordinator getCommitCoordinator() {
+ final ShardCommitCoordinator getCommitCoordinator() {
return commitCoordinator;
}
return commitCoordinator;
}
- public DatastoreContext getDatastoreContext() {
+ // non-final for mocking
+ DatastoreContext getDatastoreContext() {
return datastoreContext;
}
@VisibleForTesting
return datastoreContext;
}
@VisibleForTesting
- public ShardDataTree getDataStore() {
+ final ShardDataTree getDataStore() {
return store;
}
@VisibleForTesting
return store;
}
@VisibleForTesting
+ // non-final for mocking
ShardStats getShardMBean() {
return shardMBean;
}
ShardStats getShardMBean() {
return shardMBean;
}
private volatile boolean sealed;
private volatile boolean sealed;
- protected AbstractBuilder(final Class<? extends S> shardClass) {
+ AbstractBuilder(final Class<? extends S> shardClass) {
this.shardClass = shardClass;
}
this.shardClass = shardClass;
}
- protected void checkSealed() {
- checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
+ final void checkSealed() {
+ checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
}
@SuppressWarnings("unchecked")
}
@SuppressWarnings("unchecked")
}
public EffectiveModelContext getSchemaContext() {
}
public EffectiveModelContext getSchemaContext() {
- return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext());
+ return verifyNotNull(schemaContextProvider.getEffectiveModelContext());
}
public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
}
public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
// Add some ModificationPayload entries
for (int i = 1; i <= nListEntries; i++) {
// Add some ModificationPayload entries
for (int i = 1; i <= nListEntries; i++) {
- listEntryKeys.add(Integer.valueOf(i));
final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
dataStoreContextBuilder.persistent(persistent);
dataStoreContextBuilder.persistent(persistent);
- class TestShard extends Shard {
+ final class TestShard extends Shard {
- protected TestShard(final AbstractBuilder<?, ?> builder) {
+ TestShard(final AbstractBuilder<?, ?> builder) {
super(builder);
setPersistence(new TestPersistentDataProvider(super.persistence()));
}
super(builder);
setPersistence(new TestPersistentDataProvider(super.persistence()));
}