X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=357df4ab8d819fbb7c5d46a0ea80ec025aa71200;hb=824dce54df4b23120461e112574d2ff2effafcf6;hp=fb0ef03d1844c8e48b1c02fd77b880179306fbb0;hpb=805e9821a737d305f7f591ae51055e475e26fcdc;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index fb0ef03d18..357df4ab8d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -26,7 +26,6 @@ import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; -import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -78,6 +77,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionCh import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients; import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply; @@ -125,6 +125,7 @@ import scala.concurrent.duration.FiniteDuration; *

* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it */ +// FIXME: non-final for testing? public class Shard extends RaftActor { @VisibleForTesting @@ -216,7 +217,7 @@ public class Shard extends RaftActor { private final ActorRef exportActor; - protected Shard(final AbstractBuilder builder) { + Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -307,7 +308,7 @@ public class Shard extends RaftActor { } @Override - public void postStop() throws Exception { + public final void postStop() throws Exception { LOG.info("Stopping Shard {}", persistenceId()); super.postStop(); @@ -325,7 +326,7 @@ public class Shard extends RaftActor { } @Override - protected void handleRecover(final Object message) { + protected final void handleRecover(final Object message) { LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(), getSender()); @@ -355,6 +356,7 @@ public class Shard extends RaftActor { } @Override + // non-final for TestShard protected void handleNonRaftCommand(final Object message) { try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { final Optional maybeError = context.error(); @@ -387,6 +389,8 @@ public class Shard extends RaftActor { handleAbortTransaction(AbortTransaction.fromSerializable(message)); } else if (CloseTransactionChain.isSerializedType(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); + } else if (message instanceof DataTreeChangedReply) { + // Ignore reply } else if (message instanceof RegisterDataTreeChangeListener) { treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof UpdateSchemaContext) { @@ -662,31 +666,31 @@ public class Shard extends RaftActor { return getLeaderId() != null; } - public int getPendingTxCommitQueueSize() { + final int getPendingTxCommitQueueSize() { return store.getQueueSize(); } - public int getCohortCacheSize() { + final int getCohortCacheSize() { return commitCoordinator.getCohortCacheSize(); } @Override - protected Optional getRoleChangeNotifier() { + protected final Optional getRoleChangeNotifier() { return roleChangeNotifier; } - String getShardName() { + final String getShardName() { return shardName; } @Override - protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, + protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion) : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } - protected void onDatastoreContext(final DatastoreContext context) { + private void onDatastoreContext(final DatastoreContext context) { datastoreContext = verifyNotNull(context); setTransactionCommitTimeout(); @@ -697,8 +701,9 @@ public class Shard extends RaftActor { } // applyState() will be invoked once consensus is reached on the payload + // non-final for mocking void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { - boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); + final boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); if (canSkipPayload) { applyState(self(), id, payload); } else { @@ -743,7 +748,7 @@ public class Shard extends RaftActor { } @SuppressWarnings("checkstyle:IllegalCatch") - protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { + private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { askProtocolEncountered(batched.getTransactionId()); try { @@ -867,7 +872,7 @@ public class Shard extends RaftActor { doAbortTransaction(transactionId, getSender()); } - void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { + final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { commitCoordinator.handleAbort(transactionID, sender, this); } @@ -956,13 +961,12 @@ public class Shard extends RaftActor { } @Override - @VisibleForTesting - public RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + protected final RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return snapshotCohort; } @Override - protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + protected final RaftActorRecoveryCohort getRaftActorRecoveryCohort() { if (restoreFromSnapshot == null) { return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); } @@ -971,6 +975,7 @@ public class Shard extends RaftActor { } @Override + // non-final for testing protected void onRecoveryComplete() { restoreFromSnapshot = null; @@ -989,7 +994,7 @@ public class Shard extends RaftActor { } @Override - protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { + protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { if (data instanceof DisableTrackingPayload) { disableTracking((DisableTrackingPayload) data); @@ -1007,7 +1012,7 @@ public class Shard extends RaftActor { } @Override - protected void onStateChanged() { + protected final void onStateChanged() { boolean isLeader = isLeader(); boolean hasLeader = hasLeader(); treeChangeSupport.onLeadershipChange(isLeader, hasLeader); @@ -1030,7 +1035,7 @@ public class Shard extends RaftActor { } @Override - protected void onLeaderChanged(final String oldLeader, final String newLeader) { + protected final void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); paused = false; @@ -1051,7 +1056,9 @@ public class Shard extends RaftActor { // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); if (leader != null) { - Collection messagesToForward = convertPendingTransactionsToMessages(); + // Clears all pending transactions and converts them to messages to be forwarded to a new leader. + Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( + datastoreContext.getShardBatchedModificationCount()); if (!messagesToForward.isEmpty()) { LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(), @@ -1069,7 +1076,7 @@ public class Shard extends RaftActor { } } else { // We have become the leader, we need to reconstruct frontend state - knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this)); LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet()); } @@ -1078,18 +1085,8 @@ public class Shard extends RaftActor { } } - /** - * Clears all pending transactions and converts them to messages to be forwarded to a new leader. - * - * @return the converted messages - */ - public Collection convertPendingTransactionsToMessages() { - return commitCoordinator.convertPendingTransactionsToMessages( - datastoreContext.getShardBatchedModificationCount()); - } - @Override - protected void pauseLeader(final Runnable operation) { + protected final void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); paused = true; @@ -1103,29 +1100,30 @@ public class Shard extends RaftActor { } @Override - protected void unpauseLeader() { + protected final void unpauseLeader() { LOG.debug("{}: In unpauseLeader", persistenceId()); paused = false; store.setRunOnPendingTransactionsComplete(null); // Restore tell-based protocol state as if we were becoming the leader - knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); + knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this)); } @Override - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { - return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) - .commitCohortActors(store.getCohortActors()); + protected final OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandShardState.newBuilder() + .treeChangeListenerActors(treeChangeSupport.getListenerActors()) + .commitCohortActors(store.getCohortActors()); } @Override - public String persistenceId() { + public final String persistenceId() { return this.name; } @Override - public String journalPluginId() { + public final String journalPluginId() { // This method may be invoked from super constructor (wonderful), hence we also need to handle the case of // the field being uninitialized because our constructor is not finished. if (datastoreContext != null && !datastoreContext.isPersistent()) { @@ -1135,20 +1133,22 @@ public class Shard extends RaftActor { } @VisibleForTesting - ShardCommitCoordinator getCommitCoordinator() { + final ShardCommitCoordinator getCommitCoordinator() { return commitCoordinator; } - public DatastoreContext getDatastoreContext() { + // non-final for mocking + DatastoreContext getDatastoreContext() { return datastoreContext; } @VisibleForTesting - public ShardDataTree getDataStore() { + final ShardDataTree getDataStore() { return store; } @VisibleForTesting + // non-final for mocking ShardStats getShardMBean() { return shardMBean; } @@ -1168,12 +1168,12 @@ public class Shard extends RaftActor { private volatile boolean sealed; - protected AbstractBuilder(final Class shardClass) { + AbstractBuilder(final Class shardClass) { this.shardClass = shardClass; } - protected void checkSealed() { - checkState(!sealed, "Builder isalready sealed - further modifications are not allowed"); + final void checkSealed() { + checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); } @SuppressWarnings("unchecked") @@ -1230,7 +1230,7 @@ public class Shard extends RaftActor { } public EffectiveModelContext getSchemaContext() { - return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext()); + return verifyNotNull(schemaContextProvider.getEffectiveModelContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {