Do not retain initial SchemaContext 24/57424/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 4 May 2017 21:52:57 +0000 (23:52 +0200)
committerRobert Varga <nite@hq.sk>
Fri, 19 May 2017 12:31:10 +0000 (12:31 +0000)
While looking over a memory dump I have noticed that we retain
SchemaContext inside Shard$Builder, which is being retained via
Props (which are used to restart the actor).

This reference is not updated as the SchemaContext is updated, which
means we are wasting memory and are causing Shard to come up with
an ancient SchemaContext after a failure.

Fix this by having an AtomicReference holder for SchemaContext
and have Shard have a Supplier<SchemaContext>.

Change-Id: I73fcae46f249d3679522eb7dbbb059e43c5af6c7
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java

index fbd5b6456a79ae7c94a140008fda80db952d1dbc..c8be1bed4d7a60f86d65fe91d55db01a9aa8095f 100644 (file)
@@ -98,6 +98,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -892,7 +893,7 @@ public class Shard extends RaftActor {
         private ShardIdentifier id;
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
         private ShardIdentifier id;
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
-        private SchemaContext schemaContext;
+        private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private TipProducingDataTree dataTree;
         private volatile boolean sealed;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private TipProducingDataTree dataTree;
         private volatile boolean sealed;
@@ -928,9 +929,9 @@ public class Shard extends RaftActor {
             return self();
         }
 
             return self();
         }
 
-        public T schemaContext(final SchemaContext newSchemaContext) {
+        public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
             checkSealed();
             checkSealed();
-            this.schemaContext = newSchemaContext;
+            this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
             return self();
         }
 
             return self();
         }
 
@@ -959,7 +960,7 @@ public class Shard extends RaftActor {
         }
 
         public SchemaContext getSchemaContext() {
         }
 
         public SchemaContext getSchemaContext() {
-            return schemaContext;
+            return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
@@ -986,7 +987,7 @@ public class Shard extends RaftActor {
             Preconditions.checkNotNull(id, "id should not be null");
             Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
             Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
             Preconditions.checkNotNull(id, "id should not be null");
             Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
             Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
-            Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+            Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
         }
 
         public Props props() {
         }
 
         public Props props() {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java
new file mode 100644 (file)
index 0000000..6a4e982
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.shardmanager;
+
+import com.google.common.base.Verify;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+
+final class AtomicShardContextProvider extends AtomicReference<SchemaContext> implements SchemaContextProvider {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public SchemaContext getSchemaContext() {
+        return Verify.verifyNotNull(get());
+    }
+}
\ No newline at end of file
index 0c892295a77888d5c635c380183db1b2fa67d10b..8fc77acf96395f58cfe1d919a1fd3e8a598eba38 100644 (file)
@@ -42,7 +42,13 @@ final class ShardInformation {
     private final ShardPeerAddressResolver addressResolver;
     private final ShardIdentifier shardId;
     private final String shardName;
     private final ShardPeerAddressResolver addressResolver;
     private final ShardIdentifier shardId;
     private final String shardName;
+
+    // This reference indirection is required to have the ability to update the SchemaContext
+    // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
+    // it from becoming garbage.
+    private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
     private ActorRef actor;
     private ActorRef actor;
+
     private Optional<DataTree> localShardDataTree;
     private boolean leaderAvailable = false;
 
     private Optional<DataTree> localShardDataTree;
     private boolean leaderAvailable = false;
 
@@ -59,9 +65,9 @@ final class ShardInformation {
     private Shard.AbstractBuilder<?, ?> builder;
     private boolean isActiveMember = true;
 
     private Shard.AbstractBuilder<?, ?> builder;
     private boolean isActiveMember = true;
 
-    ShardInformation(String shardName, ShardIdentifier shardId,
-            Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
-            Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
+    ShardInformation(final String shardName, final ShardIdentifier shardId,
+            final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
+            final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
         this.shardName = shardName;
         this.shardId = shardId;
         this.initialPeerAddresses = initialPeerAddresses;
         this.shardName = shardName;
         this.shardId = shardId;
         this.initialPeerAddresses = initialPeerAddresses;
@@ -70,10 +76,10 @@ final class ShardInformation {
         this.addressResolver = addressResolver;
     }
 
         this.addressResolver = addressResolver;
     }
 
-    Props newProps(SchemaContext schemaContext) {
+    Props newProps() {
         Preconditions.checkNotNull(builder);
         Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext)
         Preconditions.checkNotNull(builder);
         Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext)
-                .schemaContext(schemaContext).props();
+                .schemaContextProvider(schemaContextProvider).props();
         builder = null;
         return props;
     }
         builder = null;
         return props;
     }
@@ -87,7 +93,7 @@ final class ShardInformation {
         return actor;
     }
 
         return actor;
     }
 
-    void setActor(ActorRef actor) {
+    void setActor(final ActorRef actor) {
         this.actor = actor;
     }
 
         this.actor = actor;
     }
 
@@ -95,7 +101,7 @@ final class ShardInformation {
         return shardId;
     }
 
         return shardId;
     }
 
-    void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+    void setLocalDataTree(final Optional<DataTree> localShardDataTree) {
         this.localShardDataTree = localShardDataTree;
     }
 
         this.localShardDataTree = localShardDataTree;
     }
 
@@ -107,7 +113,7 @@ final class ShardInformation {
         return datastoreContext;
     }
 
         return datastoreContext;
     }
 
-    void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+    void setDatastoreContext(final DatastoreContext datastoreContext, final ActorRef sender) {
         this.datastoreContext = datastoreContext;
         if (actor != null) {
             LOG.debug("Sending new DatastoreContext to {}", shardId);
         this.datastoreContext = datastoreContext;
         if (actor != null) {
             LOG.debug("Sending new DatastoreContext to {}", shardId);
@@ -115,7 +121,7 @@ final class ShardInformation {
         }
     }
 
         }
     }
 
-    void updatePeerAddress(String peerId, String peerAddress, ActorRef sender) {
+    void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
         if (actor != null) {
         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
         if (actor != null) {
@@ -128,13 +134,13 @@ final class ShardInformation {
         notifyOnShardInitializedCallbacks();
     }
 
         notifyOnShardInitializedCallbacks();
     }
 
-    void peerDown(MemberName memberName, String peerId, ActorRef sender) {
+    void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) {
         if (actor != null) {
             actor.tell(new PeerDown(memberName, peerId), sender);
         }
     }
 
         if (actor != null) {
             actor.tell(new PeerDown(memberName, peerId), sender);
         }
     }
 
-    void peerUp(MemberName memberName, String peerId, ActorRef sender) {
+    void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) {
         if (actor != null) {
             actor.tell(new PeerUp(memberName, peerId), sender);
         }
         if (actor != null) {
             actor.tell(new PeerUp(memberName, peerId), sender);
         }
@@ -195,15 +201,15 @@ final class ShardInformation {
         }
     }
 
         }
     }
 
-    void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+    void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
         onShardInitializedSet.add(onShardInitialized);
     }
 
         onShardInitializedSet.add(onShardInitialized);
     }
 
-    void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+    void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
         onShardInitializedSet.remove(onShardInitialized);
     }
 
         onShardInitializedSet.remove(onShardInitialized);
     }
 
-    void setRole(String newRole) {
+    void setRole(final String newRole) {
         this.role = newRole;
 
         notifyOnShardInitializedCallbacks();
         this.role = newRole;
 
         notifyOnShardInitializedCallbacks();
@@ -213,7 +219,7 @@ final class ShardInformation {
         return role;
     }
 
         return role;
     }
 
-    void setFollowerSyncStatus(boolean syncStatus) {
+    void setFollowerSyncStatus(final boolean syncStatus) {
         this.followerSyncStatus = syncStatus;
     }
 
         this.followerSyncStatus = syncStatus;
     }
 
@@ -227,7 +233,7 @@ final class ShardInformation {
         return false;
     }
 
         return false;
     }
 
-    boolean setLeaderId(String leaderId) {
+    boolean setLeaderId(final String leaderId) {
         final boolean changed = !Objects.equals(this.leaderId, leaderId);
         this.leaderId = leaderId;
         if (leaderId != null) {
         final boolean changed = !Objects.equals(this.leaderId, leaderId);
         this.leaderId = leaderId;
         if (leaderId != null) {
@@ -242,7 +248,7 @@ final class ShardInformation {
         return leaderId;
     }
 
         return leaderId;
     }
 
-    void setLeaderAvailable(boolean leaderAvailable) {
+    void setLeaderAvailable(final boolean leaderAvailable) {
         this.leaderAvailable = leaderAvailable;
 
         if (leaderAvailable) {
         this.leaderAvailable = leaderAvailable;
 
         if (leaderAvailable) {
@@ -254,7 +260,7 @@ final class ShardInformation {
         return leaderVersion;
     }
 
         return leaderVersion;
     }
 
-    void setLeaderVersion(short leaderVersion) {
+    void setLeaderVersion(final short leaderVersion) {
         this.leaderVersion = leaderVersion;
     }
 
         this.leaderVersion = leaderVersion;
     }
 
@@ -262,7 +268,15 @@ final class ShardInformation {
         return isActiveMember;
     }
 
         return isActiveMember;
     }
 
-    void setActiveMember(boolean isActiveMember) {
+    void setActiveMember(final boolean isActiveMember) {
         this.isActiveMember = isActiveMember;
     }
         this.isActiveMember = isActiveMember;
     }
+
+    SchemaContext getSchemaContext() {
+        return schemaContextProvider.getSchemaContext();
+    }
+
+    void setSchemaContext(final SchemaContext schemaContext) {
+        schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
+    }
 }
 }
index 38922df30c0235d8357edb2c91c44ce3b4d222b8..dadbeeb4309703f80a823ca8ac3774e9b0e7a7e7 100644 (file)
@@ -178,7 +178,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
     private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
     private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
-    ShardManager(AbstractShardManagerCreator<?> builder) {
+    ShardManager(final AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
         this.configuration = builder.getConfiguration();
         this.datastoreContextFactory = builder.getDatastoreContextFactory();
         this.cluster = builder.getCluster();
         this.configuration = builder.getConfiguration();
         this.datastoreContextFactory = builder.getDatastoreContextFactory();
@@ -223,7 +223,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @Override
     }
 
     @Override
-    public void handleCommand(Object message) throws Exception {
+    public void handleCommand(final Object message) throws Exception {
         if (message  instanceof FindPrimary) {
             findPrimary((FindPrimary)message);
         } else if (message instanceof FindLocalShard) {
         if (message  instanceof FindPrimary) {
             findPrimary((FindPrimary)message);
         } else if (message instanceof FindLocalShard) {
@@ -368,7 +368,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
             @Override
 
         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
             @Override
-            public void onComplete(Throwable failure, Iterable<Boolean> results) {
+            public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
 
                 self().tell(PoisonPill.getInstance(), self());
                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
 
                 self().tell(PoisonPill.getInstance(), self());
@@ -391,15 +391,15 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, dispatcher);
     }
 
         }, dispatcher);
     }
 
-    private void onWrappedShardResponse(WrappedShardResponse message) {
+    private void onWrappedShardResponse(final WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
                     message.getLeaderPath());
         }
     }
 
         if (message.getResponse() instanceof RemoveServerReply) {
             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
                     message.getLeaderPath());
         }
     }
 
-    private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
-            String leaderPath) {
+    private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
+            final RemoveServerReply replyMsg, final String leaderPath) {
         shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
         shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
@@ -439,7 +439,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                 if (failure != null) {
                     shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
@@ -457,8 +457,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
-            final ActorRef sender) {
+    private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
+            final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
@@ -479,7 +479,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                 if (failure != null) {
                     shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
@@ -497,7 +497,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void onShardReplicaRemoved(ServerRemoved message) {
+    private void onShardReplicaRemoved(final ServerRemoved message) {
         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
     }
 
         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
     }
 
@@ -576,7 +576,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void onCreateShard(CreateShard createShard) {
+    private void onCreateShard(final CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
 
         Object reply;
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
 
         Object reply;
@@ -636,7 +636,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ActorRef sender = getSender();
         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
             @Override
         final ActorRef sender = getSender();
         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
             @Override
-            public void onComplete(Throwable failure, Boolean result) {
+            public void onComplete(final Throwable failure, final Boolean result) {
                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
                 self().tell(messageToDefer, sender);
             }
                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
                 self().tell(messageToDefer, sender);
             }
@@ -645,7 +645,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return true;
     }
 
         return true;
     }
 
-    private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
+    private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
+            final String shardName) {
         configuration.addPrefixShardConfiguration(config);
 
         final Builder builder = newShardDatastoreContextBuilder(shardName);
         configuration.addPrefixShardConfiguration(config);
 
         final Builder builder = newShardDatastoreContextBuilder(shardName);
@@ -665,7 +666,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
-            info.setActor(newShardActor(schemaContext, info));
+            info.setSchemaContext(schemaContext);
+            info.setActor(newShardActor(info));
         }
     }
 
         }
     }
 
@@ -726,16 +728,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
-            info.setActor(newShardActor(schemaContext, info));
+            info.setSchemaContext(schemaContext);
+            info.setActor(newShardActor(info));
         }
     }
 
         }
     }
 
-    private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+    private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
                 .shardPeerAddressResolver(peerAddressResolver);
     }
 
         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
                 .shardPeerAddressResolver(peerAddressResolver);
     }
 
-    private DatastoreContext newShardDatastoreContext(String shardName) {
+    private DatastoreContext newShardDatastoreContext(final String shardName) {
         return newShardDatastoreContextBuilder(shardName).build();
     }
 
         return newShardDatastoreContextBuilder(shardName).build();
     }
 
@@ -748,7 +751,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
+    private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
@@ -765,7 +768,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+    private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
@@ -782,7 +785,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+    private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
                 status.getName(), status.isInitialSyncDone());
 
         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
                 status.getName(), status.isInitialSyncDone());
 
@@ -796,7 +799,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
 
     }
 
-    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
+    private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
@@ -809,7 +812,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
 
     }
 
 
-    private ShardInformation findShardInformation(String memberId) {
+    private ShardInformation findShardInformation(final String memberId) {
         for (ShardInformation info : localShards.values()) {
             if (info.getShardId().toString().equals(memberId)) {
                 return info;
         for (ShardInformation info : localShards.values()) {
             if (info.getShardId().toString().equals(memberId)) {
                 return info;
@@ -839,7 +842,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return true;
     }
 
         return true;
     }
 
-    private void onActorInitialized(Object message) {
+    private void onActorInitialized(final Object message) {
         final ActorRef sender = getSender();
 
         if (sender == null) {
         final ActorRef sender = getSender();
 
         if (sender == null) {
@@ -860,7 +863,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         markShardAsInitialized(shardId.getShardName());
     }
 
         markShardAsInitialized(shardId.getShardName());
     }
 
-    private void markShardAsInitialized(String shardName) {
+    private void markShardAsInitialized(final String shardName) {
         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
@@ -872,7 +875,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @Override
     }
 
     @Override
-    protected void handleRecover(Object message) throws Exception {
+    protected void handleRecover(final Object message) throws Exception {
         if (message instanceof RecoveryCompleted) {
             onRecoveryCompleted();
         } else if (message instanceof SnapshotOffer) {
         if (message instanceof RecoveryCompleted) {
             onRecoveryCompleted();
         } else if (message instanceof SnapshotOffer) {
@@ -896,8 +899,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
         createLocalShards();
     }
 
-    private void sendResponse(ShardInformation shardInformation, boolean doWait,
-            boolean wantShardReady, final Supplier<Object> messageSupplier) {
+    private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
+            final boolean wantShardReady, final Supplier<Object> messageSupplier) {
         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
             if (doWait) {
                 final ActorRef sender = getSender();
         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
             if (doWait) {
                 final ActorRef sender = getSender();
@@ -945,11 +948,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
-    private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+    private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) {
         return new NoShardLeaderException(null, shardId.toString());
     }
 
         return new NoShardLeaderException(null, shardId.toString());
     }
 
-    private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+    private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
         return new NotInitializedException(String.format(
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
         return new NotInitializedException(String.format(
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
@@ -959,7 +962,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return MemberName.forName(member.roles().iterator().next());
     }
 
         return MemberName.forName(member.roles().iterator().next());
     }
 
-    private void memberRemoved(ClusterEvent.MemberRemoved message) {
+    private void memberRemoved(final ClusterEvent.MemberRemoved message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
@@ -972,7 +975,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void memberExited(ClusterEvent.MemberExited message) {
+    private void memberExited(final ClusterEvent.MemberExited message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
@@ -985,7 +988,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void memberUp(ClusterEvent.MemberUp message) {
+    private void memberUp(final ClusterEvent.MemberUp message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
@@ -994,12 +997,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         memberUp(memberName, message.member().address());
     }
 
         memberUp(memberName, message.member().address());
     }
 
-    private void memberUp(MemberName memberName, Address address) {
+    private void memberUp(final MemberName memberName, final Address address) {
         addPeerAddress(memberName, address);
         checkReady();
     }
 
         addPeerAddress(memberName, address);
         checkReady();
     }
 
-    private void memberWeaklyUp(MemberWeaklyUp message) {
+    private void memberWeaklyUp(final MemberWeaklyUp message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
@@ -1008,7 +1011,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         memberUp(memberName, message.member().address());
     }
 
         memberUp(memberName, message.member().address());
     }
 
-    private void addPeerAddress(MemberName memberName, Address address) {
+    private void addPeerAddress(final MemberName memberName, final Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
         for (ShardInformation info : localShards.values()) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
         for (ShardInformation info : localShards.values()) {
@@ -1020,7 +1023,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void memberReachable(ClusterEvent.ReachableMember message) {
+    private void memberReachable(final ClusterEvent.ReachableMember message) {
         MemberName memberName = memberToName(message.member());
         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         MemberName memberName = memberToName(message.member());
         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
@@ -1029,7 +1032,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         markMemberAvailable(memberName);
     }
 
         markMemberAvailable(memberName);
     }
 
-    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+    private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
         MemberName memberName = memberToName(message.member());
         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         MemberName memberName = memberToName(message.member());
         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
@@ -1066,7 +1069,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+    private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
         datastoreContextFactory = factory;
         for (ShardInformation info : localShards.values()) {
             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
         datastoreContextFactory = factory;
         for (ShardInformation info : localShards.values()) {
             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
@@ -1125,9 +1128,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
 
         for (ShardInformation info : localShards.values()) {
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
 
         for (ShardInformation info : localShards.values()) {
+            info.setSchemaContext(schemaContext);
+
             if (info.getActor() == null) {
                 LOG.debug("Creating Shard {}", info.getShardId());
             if (info.getActor() == null) {
                 LOG.debug("Creating Shard {}", info.getShardId());
-                info.setActor(newShardActor(schemaContext, info));
+                info.setActor(newShardActor(info));
             } else {
                 info.getActor().tell(message, getSelf());
             }
             } else {
                 info.getActor().tell(message, getSelf());
             }
@@ -1140,12 +1145,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
     }
 
     @VisibleForTesting
-    protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
-        return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
+    protected ActorRef newShardActor(final ShardInformation info) {
+        return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
                 info.getShardId().toString());
     }
 
                 info.getShardId().toString());
     }
 
-    private void findPrimary(FindPrimary message) {
+    private void findPrimary(final FindPrimary message) {
         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 
         final String shardName = message.getShardName();
         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 
         final String shardName = message.getShardName();
@@ -1202,7 +1207,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     handler.onFailure(failure);
                 } else {
                 if (failure != null) {
                     handler.onFailure(failure);
                 } else {
@@ -1226,7 +1231,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName the shard name
      * @return a b
      */
      * @param shardName the shard name
      * @return a b
      */
-    private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
+    private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
@@ -1263,7 +1268,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      *
      * @param shardName the shard name
      */
      *
      * @param shardName the shard name
      */
-    private Map<String, String> getPeerAddresses(String shardName) {
+    private Map<String, String> getPeerAddresses(final String shardName) {
         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         return getPeerAddresses(shardName, members);
     }
         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         return getPeerAddresses(shardName, members);
     }
@@ -1332,7 +1337,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
                 getSelf()) {
             @Override
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
                 getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
                         message.getShardPrefix(), response, getSender());
                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
                         message.getShardPrefix(), response, getSender());
                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
@@ -1341,7 +1346,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             @Override
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
         });
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
         });
@@ -1372,7 +1377,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
                 getSelf()) {
             @Override
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
                 getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 final RunnableMessage runnable = (RunnableMessage) () ->
                     addShard(getShardName(), response, getSender());
                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
                 final RunnableMessage runnable = (RunnableMessage) () ->
                     addShard(getShardName(), response, getSender());
                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
@@ -1381,13 +1386,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             @Override
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
         });
     }
 
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
         });
     }
 
-    private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+    private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
         String msg = String.format("Local shard %s already exists", shardName);
         LOG.debug("{}: {}", persistenceId(), msg);
         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
         String msg = String.format("Local shard %s already exists", shardName);
         LOG.debug("{}: {}", persistenceId(), msg);
         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
@@ -1416,8 +1421,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
             shardInfo.setActiveMember(false);
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
             shardInfo.setActiveMember(false);
+            shardInfo.setSchemaContext(schemaContext);
             localShards.put(shardName, shardInfo);
             localShards.put(shardName, shardInfo);
-            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+            shardInfo.setActor(newShardActor(shardInfo));
         } else {
             removeShardOnFailure = false;
             shardInfo = existingShardInfo;
         } else {
             removeShardOnFailure = false;
             shardInfo = existingShardInfo;
@@ -1446,8 +1452,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
             shardInfo.setActiveMember(false);
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
             shardInfo.setActiveMember(false);
+            shardInfo.setSchemaContext(schemaContext);
             localShards.put(shardName, shardInfo);
             localShards.put(shardName, shardInfo);
-            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+            shardInfo.setActor(newShardActor(shardInfo));
         } else {
             removeShardOnFailure = false;
             shardInfo = existingShardInfo;
         } else {
             removeShardOnFailure = false;
             shardInfo = existingShardInfo;
@@ -1476,7 +1483,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object addServerResponse) {
+            public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
                 if (failure != null) {
                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
@@ -1492,8 +1499,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
-            boolean removeShardOnFailure) {
+    private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
+            final ActorRef sender, final boolean removeShardOnFailure) {
         shardReplicaOperationsInProgress.remove(shardName);
 
         if (removeShardOnFailure) {
         shardReplicaOperationsInProgress.remove(shardName);
 
         if (removeShardOnFailure) {
@@ -1507,8 +1514,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             new RuntimeException(message, failure)), getSelf());
     }
 
             new RuntimeException(message, failure)), getSelf());
     }
 
-    private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
-            String leaderPath, boolean removeShardOnFailure) {
+    private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
+            final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
         String shardName = shardInfo.getShardName();
         shardReplicaOperationsInProgress.remove(shardName);
 
         String shardName = shardInfo.getShardName();
         shardReplicaOperationsInProgress.remove(shardName);
 
@@ -1536,29 +1543,23 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
-                                               String leaderPath, ShardIdentifier shardId) {
-        Exception failure;
+    private static Exception getServerChangeException(final Class<?> serverChange,
+            final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
         switch (serverChangeStatus) {
             case TIMEOUT:
         switch (serverChangeStatus) {
             case TIMEOUT:
-                failure = new TimeoutException(String.format(
+                return new TimeoutException(String.format(
                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
-                break;
             case NO_LEADER:
             case NO_LEADER:
-                failure = createNoShardLeaderException(shardId);
-                break;
+                return createNoShardLeaderException(shardId);
             case NOT_SUPPORTED:
             case NOT_SUPPORTED:
-                failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+                return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
                         serverChange.getSimpleName(), shardId.getShardName()));
                         serverChange.getSimpleName(), shardId.getShardName()));
-                break;
             default :
             default :
-                failure = new RuntimeException(String.format(
-                        "%s request to leader %s for shard %s failed with status %s",
+                return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s",
                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
         }
                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
         }
-        return failure;
     }
 
     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
     }
 
     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
@@ -1567,12 +1568,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
@@ -1593,12 +1594,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardName, persistenceId(), getSelf()) {
             @Override
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardName, persistenceId(), getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
@@ -1627,7 +1628,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return currentSnapshot;
     }
 
         return currentSnapshot;
     }
 
-    private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+    private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
         currentSnapshot = snapshot;
 
         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
         currentSnapshot = snapshot;
 
         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
@@ -1651,7 +1652,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
         }
     }
 
-    private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
+    private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
@@ -1675,7 +1676,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShardFound.getPath(), getSender()));
     }
 
             localShardFound.getPath(), getSender()));
     }
 
-    private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+    private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
 
         ActorRef sender = getSender();
         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
 
         ActorRef sender = getSender();
@@ -1686,7 +1687,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             future.onComplete(new OnComplete<Object>() {
                 @Override
 
             future.onComplete(new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable failure, Object response) {
+                public void onComplete(final Throwable failure, final Object response) {
                     if (failure != null) {
                         sender.tell(new Status.Failure(new RuntimeException(
                                 String.format("Failed to access local shard %s", shardName), failure)), self());
                     if (failure != null) {
                         sender.tell(new Status.Failure(new RuntimeException(
                                 String.format("Failed to access local shard %s", shardName), failure)), self());
@@ -1710,7 +1711,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
 
     }
 
-    private void findLocalShard(FindLocalShard message) {
+    private void findLocalShard(final FindLocalShard message) {
         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
 
         final ShardInformation shardInformation = localShards.get(message.getShardName());
         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
 
         final ShardInformation shardInformation = localShards.get(message.getShardName());
@@ -1735,7 +1736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
                             failure);
                 if (failure != null) {
                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
                             failure);
@@ -1761,7 +1762,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+    private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
@@ -1780,7 +1781,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 shardReplicaOperationsInProgress.remove(shardName);
                 if (failure != null) {
                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
                 shardReplicaOperationsInProgress.remove(shardName);
                 if (failure != null) {
                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
@@ -1817,8 +1818,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String leaderPath;
         boolean removeShardOnFailure;
 
         String leaderPath;
         boolean removeShardOnFailure;
 
-        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
-                boolean removeShardOnFailure) {
+        ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
+            final String leaderPath, final boolean removeShardOnFailure) {
             this.shardInfo = shardInfo;
             this.addServerReply = addServerReply;
             this.leaderPath = leaderPath;
             this.shardInfo = shardInfo;
             this.addServerReply = addServerReply;
             this.leaderPath = leaderPath;
@@ -1832,8 +1833,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Throwable failure;
         boolean removeShardOnFailure;
 
         Throwable failure;
         boolean removeShardOnFailure;
 
-        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
-                boolean removeShardOnFailure) {
+        ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
+                final boolean removeShardOnFailure) {
             this.shardName = shardName;
             this.failureMessage = failureMessage;
             this.failure = failure;
             this.shardName = shardName;
             this.failureMessage = failureMessage;
             this.failure = failure;
@@ -1845,7 +1846,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
 
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
 
-        OnShardInitialized(Runnable replyRunnable) {
+        OnShardInitialized(final Runnable replyRunnable) {
             this.replyRunnable = replyRunnable;
         }
 
             this.replyRunnable = replyRunnable;
         }
 
@@ -1857,13 +1858,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return timeoutSchedule;
         }
 
             return timeoutSchedule;
         }
 
-        void setTimeoutSchedule(Cancellable timeoutSchedule) {
+        void setTimeoutSchedule(final Cancellable timeoutSchedule) {
             this.timeoutSchedule = timeoutSchedule;
         }
     }
 
     static class OnShardReady extends OnShardInitialized {
             this.timeoutSchedule = timeoutSchedule;
         }
     }
 
     static class OnShardReady extends OnShardInitialized {
-        OnShardReady(Runnable replyRunnable) {
+        OnShardReady(final Runnable replyRunnable) {
             super(replyRunnable);
         }
     }
             super(replyRunnable);
         }
     }
@@ -1923,8 +1924,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
          * @param persistenceId The persistenceId for the ShardManager
          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
          */
          * @param persistenceId The persistenceId for the ShardManager
          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
          */
-        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
-                ActorRef shardManagerActor) {
+        protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
+                final String persistenceId, final ActorRef shardManagerActor) {
             this.targetActor = Preconditions.checkNotNull(targetActor);
             this.shardName = Preconditions.checkNotNull(shardName);
             this.persistenceId = Preconditions.checkNotNull(persistenceId);
             this.targetActor = Preconditions.checkNotNull(targetActor);
             this.shardName = Preconditions.checkNotNull(shardName);
             this.persistenceId = Preconditions.checkNotNull(persistenceId);
@@ -1940,14 +1941,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         @Override
         }
 
         @Override
-        public void onFailure(Throwable failure) {
+        public void onFailure(final Throwable failure) {
             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
             targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
 
         @Override
             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
             targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
 
         @Override
-        public void onUnknownResponse(Object response) {
+        public void onUnknownResponse(final Object response) {
             String msg = String.format("Failed to find leader for shard %s: received response: %s",
                     shardName, response);
             LOG.debug("{}: {}", persistenceId, msg);
             String msg = String.format("Failed to find leader for shard %s: received response: %s",
                     shardName, response);
             LOG.debug("{}: {}", persistenceId, msg);
@@ -1964,7 +1965,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Object response;
         private final String leaderPath;
 
         private final Object response;
         private final String leaderPath;
 
-        WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+        WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
             this.shardId = shardId;
             this.response = response;
             this.leaderPath = leaderPath;
             this.shardId = shardId;
             this.response = response;
             this.leaderPath = leaderPath;
@@ -1988,7 +1989,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final ShardInformation shardInfo;
         private final OnShardInitialized onShardInitialized;
 
         private final ShardInformation shardInfo;
         private final OnShardInitialized onShardInitialized;
 
-        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+        ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
+            final ActorRef sender) {
             this.sender = sender;
             this.shardInfo = shardInfo;
             this.onShardInitialized = onShardInitialized;
             this.sender = sender;
             this.shardInfo = shardInfo;
             this.onShardInitialized = onShardInitialized;
index 4e29d4d5bb47ab1f4e3687303e2a92e74d697305..05d9943622301212c4c1f222a834ef7eb181a81b 100644 (file)
@@ -125,7 +125,8 @@ public abstract class AbstractShardTest extends AbstractActorTest {
     }
 
     protected Shard.Builder newShardBuilder() {
     }
 
     protected Shard.Builder newShardBuilder() {
-        return Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).schemaContext(SCHEMA_CONTEXT);
+        return Shard.builder().id(shardID).datastoreContext(newDatastoreContext())
+            .schemaContextProvider(() -> SCHEMA_CONTEXT);
     }
 
     protected void testRecovery(final Set<Integer> listEntryKeys) throws Exception {
     }
 
     protected void testRecovery(final Set<Integer> listEntryKeys) throws Exception {
index f566ba8d07181b6497f4ec5893d6a0c9c479313a..86a78a701f2e50d0ccc0fd112fd631fc3e6f6bc9 100644 (file)
@@ -2013,13 +2013,13 @@ public class ShardTest extends AbstractShardTest {
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         new ShardTestKit(getSystem()) {
             {
 
         new ShardTestKit(getSystem()) {
             {
@@ -2167,14 +2167,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@@ -2286,14 +2286,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
index 233ecb8cf344ed6f97583aea213b4169c9c7d3fc..881429bee6d00cdde3f397890ea199ff0bee5baf 100644 (file)
@@ -51,7 +51,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     private ActorRef createShard() {
         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
 
     private ActorRef createShard() {
         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
-                .schemaContext(TestModel.createTestContext()).props());
+                .schemaContextProvider(() -> TEST_SCHEMA_CONTEXT).props());
         ShardTestKit.waitUntilLeader(shard);
         return shard;
     }
         ShardTestKit.waitUntilLeader(shard);
         return shard;
     }
index f8997106f62e6124f0aedfcf0bd6abd4b09966e6..84bc87238a70a9745d510eda4f6c17c8b488d66d 100644 (file)
@@ -50,6 +50,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class ShardTransactionTest extends AbstractActorTest {
 
 
 public class ShardTransactionTest extends AbstractActorTest {
 
@@ -59,6 +60,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
+    private static final SchemaContext TEST_MODEL = TestModel.createTestContext();
 
     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
 
 
     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
 
@@ -70,7 +72,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Before
     public void setUp() {
         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
     @Before
     public void setUp() {
         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
-                .schemaContext(TestModel.createTestContext()).props()
+                .schemaContextProvider(() -> TEST_MODEL).props()
                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
         ShardTestKit.waitUntilLeader(shard);
         store = shard.underlyingActor().getDataStore();
                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
         ShardTestKit.waitUntilLeader(shard);
         store = shard.underlyingActor().getDataStore();
index c61bbc10d3fb17a169bf744c615fa3acac4e7c1a..cfef02c509a106cc1e4ea6f25d4c058440f8f80b 100644 (file)
@@ -1223,25 +1223,26 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
     }
 
         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
     }
 
-    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
+    private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
+            final String memberName) {
         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
     }
 
         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
     }
 
-    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
-                                EntityOwnerSelectionStrategyConfig config) {
+    private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
+                                final EntityOwnerSelectionStrategyConfig config) {
         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
                     .withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
                     .withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
-    private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
-            String memberName) {
+    private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
+            final String memberName) {
         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
-                dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
+                dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
     }
 
                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
     }
 
-    private Map<String, String> peerMap(String... peerIds) {
+    private Map<String, String> peerMap(final String... peerIds) {
         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
         for (String peerId: peerIds) {
             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
         for (String peerId: peerIds) {
             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
@@ -1254,14 +1255,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
 
-        TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+        TestEntityOwnershipShard(final Builder builder, final TestActorRef<MessageCollectorActor> collectorActor) {
             super(builder);
             this.collectorActor = collectorActor;
         }
 
         @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
             super(builder);
             this.collectorActor = collectorActor;
         }
 
         @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
-        public void handleCommand(Object message) {
+        public void handleCommand(final Object message) {
             Predicate drop = dropMessagesOfType.get(message.getClass());
             if (drop == null || !drop.test(message)) {
                 super.handleCommand(message);
             Predicate drop = dropMessagesOfType.get(message.getClass());
             if (drop == null || !drop.test(message)) {
                 super.handleCommand(message);
@@ -1272,15 +1273,15 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             }
         }
 
             }
         }
 
-        void startDroppingMessagesOfType(Class<?> msgClass) {
+        void startDroppingMessagesOfType(final Class<?> msgClass) {
             dropMessagesOfType.put(msgClass, msg -> true);
         }
 
             dropMessagesOfType.put(msgClass, msg -> true);
         }
 
-        <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
+        <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
             dropMessagesOfType.put(msgClass, filter);
         }
 
             dropMessagesOfType.put(msgClass, filter);
         }
 
-        void stopDroppingMessagesOfType(Class<?> msgClass) {
+        void stopDroppingMessagesOfType(final Class<?> msgClass) {
             dropMessagesOfType.remove(msgClass);
         }
 
             dropMessagesOfType.remove(msgClass);
         }
 
@@ -1288,11 +1289,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             return collectorActor;
         }
 
             return collectorActor;
         }
 
-        static Props props(Builder builder) {
+        static Props props(final Builder builder) {
             return props(builder, null);
         }
 
             return props(builder, null);
         }
 
-        static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+        static Props props(final Builder builder, final TestActorRef<MessageCollectorActor> collectorActor) {
             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
                     .withDispatcher(Dispatchers.DefaultDispatcherId());
         }
             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
                     .withDispatcher(Dispatchers.DefaultDispatcherId());
         }
index f40374b924f68ab84d571a6ef87194d88f01edb1..b3d6628375487200be0e378dbd032d830af6f4a6 100644 (file)
@@ -134,11 +134,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
 
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
-    private ActorSystem newActorSystem(String config) {
+    private ActorSystem newActorSystem(final String config) {
         return newActorSystem("cluster-test", config);
     }
 
         return newActorSystem("cluster-test", config);
     }
 
-    private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+    private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
         if (system == getSystem()) {
             return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
         if (system == getSystem()) {
             return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
@@ -151,7 +151,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         return newShardMgrProps(new MockConfiguration());
     }
 
         return newShardMgrProps(new MockConfiguration());
     }
 
-    private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
+    private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
@@ -162,7 +162,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
     }
 
         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
     }
 
-    private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
+    private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
                 .distributedDataStore(mock(DistributedDataStore.class));
     }
         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
                 .distributedDataStore(mock(DistributedDataStore.class));
     }
@@ -173,7 +173,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 Dispatchers.DefaultDispatcherId());
     }
 
                 Dispatchers.DefaultDispatcherId());
     }
 
-    private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
+    private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
                 .withDispatcher(Dispatchers.DefaultDispatcherId());
     }
         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
                 .withDispatcher(Dispatchers.DefaultDispatcherId());
     }
@@ -183,14 +183,15 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         return newTestShardManager(newShardMgrProps());
     }
 
         return newTestShardManager(newShardMgrProps());
     }
 
-    private TestShardManager newTestShardManager(Props props) {
+    private TestShardManager newTestShardManager(final Props props) {
         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
         TestShardManager shardManager = shardManagerActor.underlyingActor();
         shardManager.waitForRecoveryComplete();
         return shardManager;
     }
 
         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
         TestShardManager shardManager = shardManagerActor.underlyingActor();
         shardManager.waitForRecoveryComplete();
         return shardManager;
     }
 
-    private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
+    private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
+            final JavaTestKit kit) {
         AssertionError last = null;
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
         AssertionError last = null;
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
@@ -209,7 +210,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     }
 
     @SuppressWarnings("unchecked")
     }
 
     @SuppressWarnings("unchecked")
-    private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
+    private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final JavaTestKit kit, final String msg) {
         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
         if (reply instanceof Failure) {
             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
         if (reply instanceof Failure) {
             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
@@ -234,12 +235,12 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         final MockConfiguration mockConfig = new MockConfiguration() {
             @Override
 
         final MockConfiguration mockConfig = new MockConfiguration() {
             @Override
-            public Collection<String> getMemberShardNames(MemberName memberName) {
+            public Collection<String> getMemberShardNames(final MemberName memberName) {
                 return Arrays.asList("default", "topology");
             }
 
             @Override
                 return Arrays.asList("default", "topology");
             }
 
             @Override
-            public Collection<MemberName> getMembersFromShardName(String shardName) {
+            public Collection<MemberName> getMembersFromShardName(final String shardName) {
                 return members("member-1");
             }
         };
                 return members("member-1");
             }
         };
@@ -257,12 +258,12 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
         class LocalShardManager extends ShardManager {
         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
         class LocalShardManager extends ShardManager {
-            LocalShardManager(AbstractShardManagerCreator<?> creator) {
+            LocalShardManager(final AbstractShardManagerCreator<?> creator) {
                 super(creator);
             }
 
             @Override
                 super(creator);
             }
 
             @Override
-            protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            protected ActorRef newShardActor(final ShardInformation info) {
                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
                 ActorRef ref = null;
                 if (entry != null) {
                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
                 ActorRef ref = null;
                 if (entry != null) {
@@ -1120,7 +1121,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
             @Override
         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
             @Override
-            public List<String> getMemberShardNames(MemberName memberName) {
+            public List<String> getMemberShardNames(final MemberName memberName) {
                 return Arrays.asList("default", "astronauts");
             }
         }));
                 return Arrays.asList("default", "astronauts");
             }
         }));
@@ -1180,7 +1181,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         };
     }
 
         };
     }
 
-    private static List<MemberName> members(String... names) {
+    private static List<MemberName> members(final String... names) {
         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
     }
 
         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
     }
 
@@ -2091,14 +2092,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private volatile MessageInterceptor messageInterceptor;
 
         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private volatile MessageInterceptor messageInterceptor;
 
-        private TestShardManager(Builder builder) {
+        private TestShardManager(final Builder builder) {
             super(builder);
             shardActor = builder.shardActor;
             shardActors = builder.shardActors;
         }
 
         @Override
             super(builder);
             shardActor = builder.shardActor;
             shardActors = builder.shardActors;
         }
 
         @Override
-        protected void handleRecover(Object message) throws Exception {
+        protected void handleRecover(final Object message) throws Exception {
             try {
                 super.handleRecover(message);
             } finally {
             try {
                 super.handleRecover(message);
             } finally {
@@ -2108,14 +2109,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             }
         }
 
             }
         }
 
-        private void countDownIfOther(final Member member, CountDownLatch latch) {
+        private void countDownIfOther(final Member member, final CountDownLatch latch) {
             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
                 latch.countDown();
             }
         }
 
         @Override
             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
                 latch.countDown();
             }
         }
 
         @Override
-        public void handleCommand(Object message) throws Exception {
+        public void handleCommand(final Object message) throws Exception {
             try {
                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
                     getSender().tell(messageInterceptor.apply(message), getSelf());
             try {
                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
                     getSender().tell(messageInterceptor.apply(message), getSelf());
@@ -2137,7 +2138,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             }
         }
 
             }
         }
 
-        void setMessageInterceptor(MessageInterceptor messageInterceptor) {
+        void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
             this.messageInterceptor = messageInterceptor;
         }
 
             this.messageInterceptor = messageInterceptor;
         }
 
@@ -2177,7 +2178,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             findPrimaryMessageReceived = new CountDownLatch(1);
         }
 
             findPrimaryMessageReceived = new CountDownLatch(1);
         }
 
-        public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
+        public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
             return new Builder(datastoreContextBuilder);
         }
 
             return new Builder(datastoreContextBuilder);
         }
 
@@ -2185,37 +2186,37 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             private ActorRef shardActor;
             private final Map<String, ActorRef> shardActors = new HashMap<>();
 
             private ActorRef shardActor;
             private final Map<String, ActorRef> shardActors = new HashMap<>();
 
-            Builder(DatastoreContext.Builder datastoreContextBuilder) {
+            Builder(final DatastoreContext.Builder datastoreContextBuilder) {
                 super(TestShardManager.class);
                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
             }
 
                 super(TestShardManager.class);
                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
             }
 
-            Builder shardActor(ActorRef newShardActor) {
+            Builder shardActor(final ActorRef newShardActor) {
                 this.shardActor = newShardActor;
                 return this;
             }
 
                 this.shardActor = newShardActor;
                 return this;
             }
 
-            Builder addShardActor(String shardName, ActorRef actorRef) {
+            Builder addShardActor(final String shardName, final ActorRef actorRef) {
                 shardActors.put(shardName, actorRef);
                 return this;
             }
         }
 
         @Override
                 shardActors.put(shardName, actorRef);
                 return this;
             }
         }
 
         @Override
-        public void saveSnapshot(Object obj) {
+        public void saveSnapshot(final Object obj) {
             snapshot = (ShardManagerSnapshot) obj;
             snapshotPersist.countDown();
             super.saveSnapshot(obj);
         }
 
             snapshot = (ShardManagerSnapshot) obj;
             snapshotPersist.countDown();
             super.saveSnapshot(obj);
         }
 
-        void verifySnapshotPersisted(Set<String> shardList) {
+        void verifySnapshotPersisted(final Set<String> shardList) {
             assertEquals("saveSnapshot invoked", true,
                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
         }
 
         @Override
             assertEquals("saveSnapshot invoked", true,
                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
         }
 
         @Override
-        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+        protected ActorRef newShardActor(final ShardInformation info) {
             if (shardActors.get(info.getShardName()) != null) {
                 return shardActors.get(info.getShardName());
             }
             if (shardActors.get(info.getShardName()) != null) {
                 return shardActors.get(info.getShardName());
             }
@@ -2224,7 +2225,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 return shardActor;
             }
 
                 return shardActor;
             }
 
-            return super.newShardActor(schemaContext, info);
+            return super.newShardActor(info);
         }
     }
 
         }
     }
 
@@ -2232,7 +2233,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                                                      extends AbstractShardManagerCreator<T> {
         private final Class<C> shardManagerClass;
 
                                                      extends AbstractShardManagerCreator<T> {
         private final Class<C> shardManagerClass;
 
-        AbstractGenericCreator(Class<C> shardManagerClass) {
+        AbstractGenericCreator(final Class<C> shardManagerClass) {
             this.shardManagerClass = shardManagerClass;
             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
             this.shardManagerClass = shardManagerClass;
             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
@@ -2246,7 +2247,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     }
 
     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
     }
 
     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
-        GenericCreator(Class<C> shardManagerClass) {
+        GenericCreator(final Class<C> shardManagerClass) {
             super(shardManagerClass);
         }
     }
             super(shardManagerClass);
         }
     }
@@ -2255,7 +2256,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         private static final long serialVersionUID = 1L;
         private final Creator<ShardManager> delegate;
 
         private static final long serialVersionUID = 1L;
         private final Creator<ShardManager> delegate;
 
-        DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+        DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
             this.delegate = delegate;
         }
 
             this.delegate = delegate;
         }
 
@@ -2272,12 +2273,12 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
         return new MessageInterceptor() {
             @Override
     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
         return new MessageInterceptor() {
             @Override
-            public Object apply(Object message) {
+            public Object apply(final Object message) {
                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
             }
 
             @Override
                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
             }
 
             @Override
-            public boolean canIntercept(Object message) {
+            public boolean canIntercept(final Object message) {
                 return message instanceof FindPrimary;
             }
         };
                 return message instanceof FindPrimary;
             }
         };
@@ -2290,13 +2291,13 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         private final Class<?> requestClass;
 
         @SuppressWarnings("unused")
         private final Class<?> requestClass;
 
         @SuppressWarnings("unused")
-        MockRespondActor(Class<?> requestClass, Object responseMsg) {
+        MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
             this.requestClass = requestClass;
             this.responseMsg = responseMsg;
         }
 
         @Override
             this.requestClass = requestClass;
             this.responseMsg = responseMsg;
         }
 
         @Override
-        public void onReceive(Object message) throws Exception {
+        public void onReceive(final Object message) throws Exception {
             if (message.equals(CLEAR_RESPONSE)) {
                 responseMsg = null;
             } else {
             if (message.equals(CLEAR_RESPONSE)) {
                 responseMsg = null;
             } else {