X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=7ef6e040a9f3f0d94bf2fdc47790377d505184b3;hp=789d51a19f88942e3a35ceb7fc4d69cb20c8abcb;hb=9f19da6634f18ab4d55ca1def76566ca63416ff0;hpb=007324841fd870fc3d3491f300da6383d8874d6a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 789d51a19f..7ef6e040a9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -27,9 +27,19 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; +import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -55,9 +65,11 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -75,14 +87,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import javax.annotation.Nonnull; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - /** * A Shard represents a portion of the logical data tree
*

@@ -103,10 +107,6 @@ public class Shard extends RaftActor { private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); - // By default persistent will be true and can be turned off using the system - // property shard.persistent - private final boolean persistent; - /// The name of this shard private final ShardIdentifier name; @@ -119,6 +119,8 @@ public class Shard extends RaftActor { private final DatastoreContext datastoreContext; + private final DataPersistenceProvider dataPersistenceProvider; + private SchemaContext schemaContext; private ActorRef createSnapshotTransaction; @@ -131,6 +133,8 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; + private final Optional roleChangeNotifier; + /** * Coordinates persistence recovery on startup. */ @@ -139,20 +143,17 @@ public class Shard extends RaftActor { private final Map transactionChains = new HashMap<>(); - protected Shard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + protected Shard(final ShardIdentifier name, final Map peerAddresses, + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); this.name = name; this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; + this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); - String setting = System.getProperty("shard.persistent"); - - this.persistent = !"false".equals(setting); - - LOG.info("Shard created : {} persistent : {}", name, persistent); + LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent()); store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); @@ -163,7 +164,6 @@ public class Shard extends RaftActor { shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); - shardMBean.setDataStoreExecutor(store.getDomStoreExecutor()); shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager()); if (isMetricsCaptureEnabled()) { @@ -175,10 +175,13 @@ public class Shard extends RaftActor { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + + // create a notifier actor for each cluster member + roleChangeNotifier = createRoleChangeNotifier(name.toString()); } private static Map mapPeerAddresses( - Map peerAddresses) { + final Map peerAddresses) { Map map = new HashMap<>(); for (Map.Entry entry : peerAddresses @@ -191,7 +194,7 @@ public class Shard extends RaftActor { public static Props props(final ShardIdentifier name, final Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); @@ -200,6 +203,12 @@ public class Shard extends RaftActor { return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext)); } + private Optional createRoleChangeNotifier(String shardId) { + ActorRef shardRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); + return Optional.of(shardRoleChangeNotifier); + } + @Override public void postStop() { super.postStop(); @@ -210,7 +219,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveRecover(Object message) { + public void onReceiveRecover(final Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), @@ -229,7 +238,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveCommand(Object message) { + public void onReceiveCommand(final Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); } @@ -263,6 +272,11 @@ public class Shard extends RaftActor { } } + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { @@ -276,7 +290,7 @@ public class Shard extends RaftActor { } } - private void handleCommitTransaction(CommitTransaction commit) { + private void handleCommitTransaction(final CommitTransaction commit) { final String transactionID = commit.getTransactionID(); LOG.debug("Committing transaction {}", transactionID); @@ -307,11 +321,13 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - if(persistent) { - Shard.this.persistData(getSender(), transactionID, - new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); + // If we do not have any followers and we are not using persistence we can + // apply modification to the state immediately + if(!hasFollowers() && !persistence().isRecoveryApplicable()){ + applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); } else { - Shard.this.finishCommit(getSender(), transactionID); + Shard.this.persistData(getSender(), transactionID, + new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable())); } } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", @@ -375,13 +391,14 @@ public class Shard extends RaftActor { commitCoordinator.currentTransactionComplete(transactionID, true); } - private void handleCanCommitTransaction(CanCommitTransaction canCommit) { + private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { LOG.debug("Can committing transaction {}", canCommit.getTransactionID()); commitCoordinator.handleCanCommit(canCommit, getSender(), self()); } private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { - LOG.debug("Readying transaction {}", ready.getTransactionID()); + LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(), + ready.getTxnClientVersion()); // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the // commitCoordinator in preparation for the subsequent three phase commit initiated by @@ -389,19 +406,29 @@ public class Shard extends RaftActor { commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(), ready.getModification()); - // Return our actor path as we'll handle the three phase commit. - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(self())); - getSender().tell( - ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply, - getSelf()); + // Return our actor path as we'll handle the three phase commit, except if the Tx client + // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version + // node. In that case, the subsequent 3-phase commit messages won't contain the + // transactionId so to maintain backwards compatibility, we create a separate cohort actor + // to provide the compatible behavior. + ActorRef replyActorPath = self(); + if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { + LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); + replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ready.getTransactionID())); + } + + ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply( + Serialization.serializedActorPath(replyActorPath)); + getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : + readyTransactionReply, getSelf()); } - private void handleAbortTransaction(AbortTransaction abort) { + private void handleAbortTransaction(final AbortTransaction abort) { doAbortTransaction(abort.getTransactionID(), getSender()); } - private void doAbortTransaction(String transactionID, final ActorRef sender) { + void doAbortTransaction(final String transactionID, final ActorRef sender) { final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { LOG.debug("Aborting transaction {}", transactionID); @@ -416,7 +443,7 @@ public class Shard extends RaftActor { Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Void v) { + public void onSuccess(final Void v) { shardMBean.incrementAbortTransactionsCount(); if(sender != null) { @@ -425,7 +452,7 @@ public class Shard extends RaftActor { } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable t) { LOG.error(t, "An exception happened during abort"); if(sender != null) { @@ -436,20 +463,20 @@ public class Shard extends RaftActor { } } - private void handleCreateTransaction(Object message) { + private void handleCreateTransaction(final Object message) { if (isLeader()) { createTransaction(CreateTransaction.fromSerializable(message)); } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException( + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException( "Could not find shard leader so transaction cannot be created. This typically happens" + - " when system is coming up or recovering and a leader is being elected. Try again" + + " when the system is coming up or recovering and a leader is being elected. Try again" + " later.")), getSelf()); } } - private void handleReadDataReply(Object message) { + private void handleReadDataReply(final Object message) { // This must be for install snapshot. Don't want to open this up and trigger // deSerialization @@ -463,7 +490,7 @@ public class Shard extends RaftActor { getSender().tell(PoisonPill.getInstance(), self()); } - private void closeTransactionChain(CloseTransactionChain closeTransactionChain) { + private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { DOMStoreTransactionChain chain = transactionChains.remove(closeTransactionChain.getTransactionChainId()); @@ -472,10 +499,8 @@ public class Shard extends RaftActor { } } - private ActorRef createTypedTransactionActor( - int transactionType, - ShardTransactionIdentifier transactionId, - String transactionChainId ) { + private ActorRef createTypedTransactionActor(int transactionType, + ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -488,8 +513,8 @@ public class Shard extends RaftActor { } } - if(this.schemaContext == null){ - throw new NullPointerException("schemaContext should not be null"); + if(this.schemaContext == null) { + throw new IllegalStateException("SchemaContext is not set"); } if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { @@ -499,7 +524,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), schemaContext,datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { @@ -508,7 +534,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { @@ -518,7 +545,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -527,39 +555,44 @@ public class Shard extends RaftActor { } private void createTransaction(CreateTransaction createTransaction) { - createTransaction(createTransaction.getTransactionType(), - createTransaction.getTransactionId(), createTransaction.getTransactionChainId()); + try { + ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(), + createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), + createTransaction.getVersion()); + + getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), + createTransaction.getTransactionId()).toSerializable(), getSelf()); + } catch (Exception e) { + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } } - private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) { + private ActorRef createTransaction(int transactionType, String remoteTransactionId, + String transactionChainId, int clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() .remoteTransactionId(remoteTransactionId) .build(); + if(LOG.isDebugEnabled()) { LOG.debug("Creating transaction : {} ", transactionId); } - ActorRef transactionActor = - createTypedTransactionActor(transactionType, transactionId, transactionChainId); - getSender() - .tell(new CreateTransactionReply( - Serialization.serializedActorPath(transactionActor), - remoteTransactionId).toSerializable(), - getSelf()); + ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, + transactionChainId, clientVersion); return transactionActor; } - private void syncCommitTransaction(DOMStoreWriteTransaction transaction) + private void syncCommitTransaction(final DOMStoreWriteTransaction transaction) throws ExecutionException, InterruptedException { DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); commitCohort.preCommit().get(); commitCohort.commit().get(); } - private void commitWithNewTransaction(Modification modification) { + private void commitWithNewTransaction(final Modification modification) { DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); modification.apply(tx); try { @@ -572,18 +605,18 @@ public class Shard extends RaftActor { } } - private void updateSchemaContext(UpdateSchemaContext message) { + private void updateSchemaContext(final UpdateSchemaContext message) { this.schemaContext = message.getSchemaContext(); updateSchemaContext(message.getSchemaContext()); store.onGlobalContextUpdated(message.getSchemaContext()); } @VisibleForTesting - void updateSchemaContext(SchemaContext schemaContext) { + void updateSchemaContext(final SchemaContext schemaContext) { store.onGlobalContextUpdated(schemaContext); } - private void registerChangeListener(RegisterChangeListener registerChangeListener) { + private void registerChangeListener(final RegisterChangeListener registerChangeListener) { LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath()); @@ -606,12 +639,12 @@ public class Shard extends RaftActor { LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } private ListenerRegistration>> doChangeListenerRegistration( - RegisterChangeListener registerChangeListener) { + final RegisterChangeListener registerChangeListener) { ActorSelection dataChangeListenerPath = getContext().system().actorSelection( registerChangeListener.getDataChangeListenerPath()); @@ -641,7 +674,7 @@ public class Shard extends RaftActor { @Override protected - void startLogRecoveryBatch(int maxBatchSize) { + void startLogRecoveryBatch(final int maxBatchSize) { currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); if(LOG.isDebugEnabled()) { @@ -650,16 +683,18 @@ public class Shard extends RaftActor { } @Override - protected void appendRecoveredLogEntry(Payload data) { + protected void appendRecoveredLogEntry(final Payload data) { if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); + } else if (data instanceof CompositeModificationByteStringPayload) { + currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); } else { LOG.error("Unknown state received {} during recovery", data); } } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { + protected void applyRecoverySnapshot(final ByteString snapshot) { if(recoveryCoordinator == null) { recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); } @@ -724,24 +759,17 @@ public class Shard extends RaftActor { } @Override - protected void applyState(ActorRef clientActor, String identifier, Object data) { + protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); - if(modification == null) { - LOG.error( - "modification is null - this is very unexpected, clientActor = {}, identifier = {}", - identifier, clientActor != null ? clientActor.path().toString() : null); - } else if(clientActor == null) { - // There's no clientActor to which to send a commit reply so we must be applying - // replicated state from the leader. - commitWithNewTransaction(MutableCompositeModification.fromSerializable( - modification, schemaContext)); - } else { - // This must be the OK to commit after replication consensus. - finishCommit(clientActor, identifier); - } + applyModificationToState(clientActor, identifier, modification); + } else if(data instanceof CompositeModificationByteStringPayload ){ + Object modification = ((CompositeModificationByteStringPayload) data).getModification(); + + applyModificationToState(clientActor, identifier, modification); + } else { LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), @@ -752,6 +780,22 @@ public class Shard extends RaftActor { } + private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { + if(modification == null) { + LOG.error( + "modification is null - this is very unexpected, clientActor = {}, identifier = {}", + identifier, clientActor != null ? clientActor.path().toString() : null); + } else if(clientActor == null) { + // There's no clientActor to which to send a commit reply so we must be applying + // replicated state from the leader. + commitWithNewTransaction(MutableCompositeModification.fromSerializable( + modification, schemaContext)); + } else { + // This must be the OK to commit after replication consensus. + finishCommit(clientActor, identifier); + } + } + private void updateJournalStats() { ReplicatedLogEntry lastLogEntry = getLastLogEntry(); @@ -762,6 +806,7 @@ public class Shard extends RaftActor { shardMBean.setCommitIndex(getCommitIndex()); shardMBean.setLastApplied(getLastApplied()); + shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize()); } @Override @@ -772,7 +817,8 @@ public class Shard extends RaftActor { // so that this actor does not get block building the snapshot createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), - "createSnapshot" + ++createSnapshotTransactionCounter, ""); + "createSnapshot" + ++createSnapshotTransactionCounter, "", + CreateTransaction.CURRENT_VERSION); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); @@ -782,7 +828,7 @@ public class Shard extends RaftActor { @VisibleForTesting @Override - protected void applySnapshot(ByteString snapshot) { + protected void applySnapshot(final ByteString snapshot) { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -842,7 +888,12 @@ public class Shard extends RaftActor { } } - @Override protected void onLeaderChanged(String oldLeader, String newLeader) { + @Override + protected DataPersistenceProvider persistence() { + return dataPersistenceProvider; + } + + @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.setLeader(newLeader); } @@ -850,6 +901,11 @@ public class Shard extends RaftActor { return this.name.toString(); } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + private static class ShardCreator implements Creator { private static final long serialVersionUID = 1L; @@ -859,8 +915,8 @@ public class Shard extends RaftActor { final DatastoreContext datastoreContext; final SchemaContext schemaContext; - ShardCreator(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + ShardCreator(final ShardIdentifier name, final Map peerAddresses, + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { this.name = name; this.peerAddresses = peerAddresses; this.datastoreContext = datastoreContext; @@ -893,11 +949,11 @@ public class Shard extends RaftActor { private volatile ListenerRegistration>> delegate; - DelayedListenerRegistration(RegisterChangeListener registerChangeListener) { + DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) { this.registerChangeListener = registerChangeListener; } - void setDelegate( ListenerRegistration>> registration) { this.delegate = registration; }