Do not retain initial SchemaContext 24/57424/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 4 May 2017 21:52:57 +0000 (23:52 +0200)
committerRobert Varga <nite@hq.sk>
Fri, 19 May 2017 12:31:10 +0000 (12:31 +0000)
While looking over a memory dump I have noticed that we retain
SchemaContext inside Shard$Builder, which is being retained via
Props (which are used to restart the actor).

This reference is not updated as the SchemaContext is updated, which
means we are wasting memory and are causing Shard to come up with
an ancient SchemaContext after a failure.

Fix this by having an AtomicReference holder for SchemaContext
and have Shard have a Supplier<SchemaContext>.

Change-Id: I73fcae46f249d3679522eb7dbbb059e43c5af6c7
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java

index fbd5b64..c8be1be 100644 (file)
@@ -98,6 +98,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -892,7 +893,7 @@ public class Shard extends RaftActor {
         private ShardIdentifier id;
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
-        private SchemaContext schemaContext;
+        private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private TipProducingDataTree dataTree;
         private volatile boolean sealed;
@@ -928,9 +929,9 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T schemaContext(final SchemaContext newSchemaContext) {
+        public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
             checkSealed();
-            this.schemaContext = newSchemaContext;
+            this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
             return self();
         }
 
@@ -959,7 +960,7 @@ public class Shard extends RaftActor {
         }
 
         public SchemaContext getSchemaContext() {
-            return schemaContext;
+            return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
@@ -986,7 +987,7 @@ public class Shard extends RaftActor {
             Preconditions.checkNotNull(id, "id should not be null");
             Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
             Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
-            Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+            Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
         }
 
         public Props props() {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java
new file mode 100644 (file)
index 0000000..6a4e982
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.shardmanager;
+
+import com.google.common.base.Verify;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+
+final class AtomicShardContextProvider extends AtomicReference<SchemaContext> implements SchemaContextProvider {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public SchemaContext getSchemaContext() {
+        return Verify.verifyNotNull(get());
+    }
+}
\ No newline at end of file
index 0c89229..8fc77ac 100644 (file)
@@ -42,7 +42,13 @@ final class ShardInformation {
     private final ShardPeerAddressResolver addressResolver;
     private final ShardIdentifier shardId;
     private final String shardName;
+
+    // This reference indirection is required to have the ability to update the SchemaContext
+    // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
+    // it from becoming garbage.
+    private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
     private ActorRef actor;
+
     private Optional<DataTree> localShardDataTree;
     private boolean leaderAvailable = false;
 
@@ -59,9 +65,9 @@ final class ShardInformation {
     private Shard.AbstractBuilder<?, ?> builder;
     private boolean isActiveMember = true;
 
-    ShardInformation(String shardName, ShardIdentifier shardId,
-            Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
-            Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
+    ShardInformation(final String shardName, final ShardIdentifier shardId,
+            final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
+            final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
         this.shardName = shardName;
         this.shardId = shardId;
         this.initialPeerAddresses = initialPeerAddresses;
@@ -70,10 +76,10 @@ final class ShardInformation {
         this.addressResolver = addressResolver;
     }
 
-    Props newProps(SchemaContext schemaContext) {
+    Props newProps() {
         Preconditions.checkNotNull(builder);
         Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext)
-                .schemaContext(schemaContext).props();
+                .schemaContextProvider(schemaContextProvider).props();
         builder = null;
         return props;
     }
@@ -87,7 +93,7 @@ final class ShardInformation {
         return actor;
     }
 
-    void setActor(ActorRef actor) {
+    void setActor(final ActorRef actor) {
         this.actor = actor;
     }
 
@@ -95,7 +101,7 @@ final class ShardInformation {
         return shardId;
     }
 
-    void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+    void setLocalDataTree(final Optional<DataTree> localShardDataTree) {
         this.localShardDataTree = localShardDataTree;
     }
 
@@ -107,7 +113,7 @@ final class ShardInformation {
         return datastoreContext;
     }
 
-    void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+    void setDatastoreContext(final DatastoreContext datastoreContext, final ActorRef sender) {
         this.datastoreContext = datastoreContext;
         if (actor != null) {
             LOG.debug("Sending new DatastoreContext to {}", shardId);
@@ -115,7 +121,7 @@ final class ShardInformation {
         }
     }
 
-    void updatePeerAddress(String peerId, String peerAddress, ActorRef sender) {
+    void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
         if (actor != null) {
@@ -128,13 +134,13 @@ final class ShardInformation {
         notifyOnShardInitializedCallbacks();
     }
 
-    void peerDown(MemberName memberName, String peerId, ActorRef sender) {
+    void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) {
         if (actor != null) {
             actor.tell(new PeerDown(memberName, peerId), sender);
         }
     }
 
-    void peerUp(MemberName memberName, String peerId, ActorRef sender) {
+    void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) {
         if (actor != null) {
             actor.tell(new PeerUp(memberName, peerId), sender);
         }
@@ -195,15 +201,15 @@ final class ShardInformation {
         }
     }
 
-    void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+    void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
         onShardInitializedSet.add(onShardInitialized);
     }
 
-    void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+    void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
         onShardInitializedSet.remove(onShardInitialized);
     }
 
-    void setRole(String newRole) {
+    void setRole(final String newRole) {
         this.role = newRole;
 
         notifyOnShardInitializedCallbacks();
@@ -213,7 +219,7 @@ final class ShardInformation {
         return role;
     }
 
-    void setFollowerSyncStatus(boolean syncStatus) {
+    void setFollowerSyncStatus(final boolean syncStatus) {
         this.followerSyncStatus = syncStatus;
     }
 
@@ -227,7 +233,7 @@ final class ShardInformation {
         return false;
     }
 
-    boolean setLeaderId(String leaderId) {
+    boolean setLeaderId(final String leaderId) {
         final boolean changed = !Objects.equals(this.leaderId, leaderId);
         this.leaderId = leaderId;
         if (leaderId != null) {
@@ -242,7 +248,7 @@ final class ShardInformation {
         return leaderId;
     }
 
-    void setLeaderAvailable(boolean leaderAvailable) {
+    void setLeaderAvailable(final boolean leaderAvailable) {
         this.leaderAvailable = leaderAvailable;
 
         if (leaderAvailable) {
@@ -254,7 +260,7 @@ final class ShardInformation {
         return leaderVersion;
     }
 
-    void setLeaderVersion(short leaderVersion) {
+    void setLeaderVersion(final short leaderVersion) {
         this.leaderVersion = leaderVersion;
     }
 
@@ -262,7 +268,15 @@ final class ShardInformation {
         return isActiveMember;
     }
 
-    void setActiveMember(boolean isActiveMember) {
+    void setActiveMember(final boolean isActiveMember) {
         this.isActiveMember = isActiveMember;
     }
+
+    SchemaContext getSchemaContext() {
+        return schemaContextProvider.getSchemaContext();
+    }
+
+    void setSchemaContext(final SchemaContext schemaContext) {
+        schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
+    }
 }
index 38922df..dadbeeb 100644 (file)
@@ -178,7 +178,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
     private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
-    ShardManager(AbstractShardManagerCreator<?> builder) {
+    ShardManager(final AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
         this.configuration = builder.getConfiguration();
         this.datastoreContextFactory = builder.getDatastoreContextFactory();
@@ -223,7 +223,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @Override
-    public void handleCommand(Object message) throws Exception {
+    public void handleCommand(final Object message) throws Exception {
         if (message  instanceof FindPrimary) {
             findPrimary((FindPrimary)message);
         } else if (message instanceof FindLocalShard) {
@@ -368,7 +368,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
             @Override
-            public void onComplete(Throwable failure, Iterable<Boolean> results) {
+            public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
 
                 self().tell(PoisonPill.getInstance(), self());
@@ -391,15 +391,15 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, dispatcher);
     }
 
-    private void onWrappedShardResponse(WrappedShardResponse message) {
+    private void onWrappedShardResponse(final WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
                     message.getLeaderPath());
         }
     }
 
-    private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
-            String leaderPath) {
+    private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
+            final RemoveServerReply replyMsg, final String leaderPath) {
         shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
@@ -439,7 +439,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
@@ -457,8 +457,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
-            final ActorRef sender) {
+    private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
+            final String primaryPath, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
         }
@@ -479,7 +479,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
@@ -497,7 +497,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void onShardReplicaRemoved(ServerRemoved message) {
+    private void onShardReplicaRemoved(final ServerRemoved message) {
         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
     }
 
@@ -576,7 +576,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void onCreateShard(CreateShard createShard) {
+    private void onCreateShard(final CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
 
         Object reply;
@@ -636,7 +636,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ActorRef sender = getSender();
         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
             @Override
-            public void onComplete(Throwable failure, Boolean result) {
+            public void onComplete(final Throwable failure, final Boolean result) {
                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
                 self().tell(messageToDefer, sender);
             }
@@ -645,7 +645,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return true;
     }
 
-    private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
+    private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
+            final String shardName) {
         configuration.addPrefixShardConfiguration(config);
 
         final Builder builder = newShardDatastoreContextBuilder(shardName);
@@ -665,7 +666,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
-            info.setActor(newShardActor(schemaContext, info));
+            info.setSchemaContext(schemaContext);
+            info.setActor(newShardActor(info));
         }
     }
 
@@ -726,16 +728,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
-            info.setActor(newShardActor(schemaContext, info));
+            info.setSchemaContext(schemaContext);
+            info.setActor(newShardActor(info));
         }
     }
 
-    private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+    private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
                 .shardPeerAddressResolver(peerAddressResolver);
     }
 
-    private DatastoreContext newShardDatastoreContext(String shardName) {
+    private DatastoreContext newShardDatastoreContext(final String shardName) {
         return newShardDatastoreContextBuilder(shardName).build();
     }
 
@@ -748,7 +751,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
+    private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
@@ -765,7 +768,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+    private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
@@ -782,7 +785,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+    private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
                 status.getName(), status.isInitialSyncDone());
 
@@ -796,7 +799,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
-    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
+    private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
                 roleChanged.getOldRole(), roleChanged.getNewRole());
 
@@ -809,7 +812,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
 
-    private ShardInformation findShardInformation(String memberId) {
+    private ShardInformation findShardInformation(final String memberId) {
         for (ShardInformation info : localShards.values()) {
             if (info.getShardId().toString().equals(memberId)) {
                 return info;
@@ -839,7 +842,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return true;
     }
 
-    private void onActorInitialized(Object message) {
+    private void onActorInitialized(final Object message) {
         final ActorRef sender = getSender();
 
         if (sender == null) {
@@ -860,7 +863,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         markShardAsInitialized(shardId.getShardName());
     }
 
-    private void markShardAsInitialized(String shardName) {
+    private void markShardAsInitialized(final String shardName) {
         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
@@ -872,7 +875,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @Override
-    protected void handleRecover(Object message) throws Exception {
+    protected void handleRecover(final Object message) throws Exception {
         if (message instanceof RecoveryCompleted) {
             onRecoveryCompleted();
         } else if (message instanceof SnapshotOffer) {
@@ -896,8 +899,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
-    private void sendResponse(ShardInformation shardInformation, boolean doWait,
-            boolean wantShardReady, final Supplier<Object> messageSupplier) {
+    private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
+            final boolean wantShardReady, final Supplier<Object> messageSupplier) {
         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
             if (doWait) {
                 final ActorRef sender = getSender();
@@ -945,11 +948,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
-    private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+    private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) {
         return new NoShardLeaderException(null, shardId.toString());
     }
 
-    private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+    private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
         return new NotInitializedException(String.format(
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
@@ -959,7 +962,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return MemberName.forName(member.roles().iterator().next());
     }
 
-    private void memberRemoved(ClusterEvent.MemberRemoved message) {
+    private void memberRemoved(final ClusterEvent.MemberRemoved message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
@@ -972,7 +975,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void memberExited(ClusterEvent.MemberExited message) {
+    private void memberExited(final ClusterEvent.MemberExited message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
@@ -985,7 +988,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void memberUp(ClusterEvent.MemberUp message) {
+    private void memberUp(final ClusterEvent.MemberUp message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
@@ -994,12 +997,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         memberUp(memberName, message.member().address());
     }
 
-    private void memberUp(MemberName memberName, Address address) {
+    private void memberUp(final MemberName memberName, final Address address) {
         addPeerAddress(memberName, address);
         checkReady();
     }
 
-    private void memberWeaklyUp(MemberWeaklyUp message) {
+    private void memberWeaklyUp(final MemberWeaklyUp message) {
         MemberName memberName = memberToName(message.member());
 
         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
@@ -1008,7 +1011,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         memberUp(memberName, message.member().address());
     }
 
-    private void addPeerAddress(MemberName memberName, Address address) {
+    private void addPeerAddress(final MemberName memberName, final Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
         for (ShardInformation info : localShards.values()) {
@@ -1020,7 +1023,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void memberReachable(ClusterEvent.ReachableMember message) {
+    private void memberReachable(final ClusterEvent.ReachableMember message) {
         MemberName memberName = memberToName(message.member());
         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
@@ -1029,7 +1032,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         markMemberAvailable(memberName);
     }
 
-    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+    private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
         MemberName memberName = memberToName(message.member());
         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
@@ -1066,7 +1069,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+    private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
         datastoreContextFactory = factory;
         for (ShardInformation info : localShards.values()) {
             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
@@ -1125,9 +1128,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
 
         for (ShardInformation info : localShards.values()) {
+            info.setSchemaContext(schemaContext);
+
             if (info.getActor() == null) {
                 LOG.debug("Creating Shard {}", info.getShardId());
-                info.setActor(newShardActor(schemaContext, info));
+                info.setActor(newShardActor(info));
             } else {
                 info.getActor().tell(message, getSelf());
             }
@@ -1140,12 +1145,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
-        return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
+    protected ActorRef newShardActor(final ShardInformation info) {
+        return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
                 info.getShardId().toString());
     }
 
-    private void findPrimary(FindPrimary message) {
+    private void findPrimary(final FindPrimary message) {
         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 
         final String shardName = message.getShardName();
@@ -1202,7 +1207,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     handler.onFailure(failure);
                 } else {
@@ -1226,7 +1231,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName the shard name
      * @return a b
      */
-    private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
+    private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
@@ -1263,7 +1268,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      *
      * @param shardName the shard name
      */
-    private Map<String, String> getPeerAddresses(String shardName) {
+    private Map<String, String> getPeerAddresses(final String shardName) {
         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         return getPeerAddresses(shardName, members);
     }
@@ -1332,7 +1337,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
                 getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
                         message.getShardPrefix(), response, getSender());
                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
@@ -1341,7 +1346,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
         });
@@ -1372,7 +1377,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
                 getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 final RunnableMessage runnable = (RunnableMessage) () ->
                     addShard(getShardName(), response, getSender());
                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
@@ -1381,13 +1386,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
             }
         });
     }
 
-    private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+    private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
         String msg = String.format("Local shard %s already exists", shardName);
         LOG.debug("{}: {}", persistenceId(), msg);
         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
@@ -1416,8 +1421,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
             shardInfo.setActiveMember(false);
+            shardInfo.setSchemaContext(schemaContext);
             localShards.put(shardName, shardInfo);
-            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+            shardInfo.setActor(newShardActor(shardInfo));
         } else {
             removeShardOnFailure = false;
             shardInfo = existingShardInfo;
@@ -1446,8 +1452,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
                     Shard.builder(), peerAddressResolver);
             shardInfo.setActiveMember(false);
+            shardInfo.setSchemaContext(schemaContext);
             localShards.put(shardName, shardInfo);
-            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+            shardInfo.setActor(newShardActor(shardInfo));
         } else {
             removeShardOnFailure = false;
             shardInfo = existingShardInfo;
@@ -1476,7 +1483,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object addServerResponse) {
+            public void onComplete(final Throwable failure, final Object addServerResponse) {
                 if (failure != null) {
                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
@@ -1492,8 +1499,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
-            boolean removeShardOnFailure) {
+    private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
+            final ActorRef sender, final boolean removeShardOnFailure) {
         shardReplicaOperationsInProgress.remove(shardName);
 
         if (removeShardOnFailure) {
@@ -1507,8 +1514,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             new RuntimeException(message, failure)), getSelf());
     }
 
-    private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
-            String leaderPath, boolean removeShardOnFailure) {
+    private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
+            final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
         String shardName = shardInfo.getShardName();
         shardReplicaOperationsInProgress.remove(shardName);
 
@@ -1536,29 +1543,23 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
-                                               String leaderPath, ShardIdentifier shardId) {
-        Exception failure;
+    private static Exception getServerChangeException(final Class<?> serverChange,
+            final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
         switch (serverChangeStatus) {
             case TIMEOUT:
-                failure = new TimeoutException(String.format(
+                return new TimeoutException(String.format(
                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
-                break;
             case NO_LEADER:
-                failure = createNoShardLeaderException(shardId);
-                break;
+                return createNoShardLeaderException(shardId);
             case NOT_SUPPORTED:
-                failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+                return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
                         serverChange.getSimpleName(), shardId.getShardName()));
-                break;
             default :
-                failure = new RuntimeException(String.format(
-                        "%s request to leader %s for shard %s failed with status %s",
+                return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s",
                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
         }
-        return failure;
     }
 
     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
@@ -1567,12 +1568,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
@@ -1593,12 +1594,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardName, persistenceId(), getSelf()) {
             @Override
-            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+            public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
             @Override
-            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+            public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
                 doRemoveShardReplicaAsync(response.getPrimaryPath());
             }
 
@@ -1627,7 +1628,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return currentSnapshot;
     }
 
-    private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+    private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
         currentSnapshot = snapshot;
 
         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
@@ -1651,7 +1652,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
+    private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
@@ -1675,7 +1676,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShardFound.getPath(), getSender()));
     }
 
-    private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+    private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
 
         ActorRef sender = getSender();
@@ -1686,7 +1687,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             future.onComplete(new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable failure, Object response) {
+                public void onComplete(final Throwable failure, final Object response) {
                     if (failure != null) {
                         sender.tell(new Status.Failure(new RuntimeException(
                                 String.format("Failed to access local shard %s", shardName), failure)), self());
@@ -1710,7 +1711,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
-    private void findLocalShard(FindLocalShard message) {
+    private void findLocalShard(final FindLocalShard message) {
         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
 
         final ShardInformation shardInformation = localShards.get(message.getShardName());
@@ -1735,7 +1736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
                             failure);
@@ -1761,7 +1762,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
-    private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+    private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
         if (isShardReplicaOperationInProgress(shardName, sender)) {
             return;
@@ -1780,7 +1781,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) {
+            public void onComplete(final Throwable failure, final Object response) {
                 shardReplicaOperationsInProgress.remove(shardName);
                 if (failure != null) {
                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
@@ -1817,8 +1818,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String leaderPath;
         boolean removeShardOnFailure;
 
-        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
-                boolean removeShardOnFailure) {
+        ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
+            final String leaderPath, final boolean removeShardOnFailure) {
             this.shardInfo = shardInfo;
             this.addServerReply = addServerReply;
             this.leaderPath = leaderPath;
@@ -1832,8 +1833,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         Throwable failure;
         boolean removeShardOnFailure;
 
-        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
-                boolean removeShardOnFailure) {
+        ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
+                final boolean removeShardOnFailure) {
             this.shardName = shardName;
             this.failureMessage = failureMessage;
             this.failure = failure;
@@ -1845,7 +1846,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
 
-        OnShardInitialized(Runnable replyRunnable) {
+        OnShardInitialized(final Runnable replyRunnable) {
             this.replyRunnable = replyRunnable;
         }
 
@@ -1857,13 +1858,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return timeoutSchedule;
         }
 
-        void setTimeoutSchedule(Cancellable timeoutSchedule) {
+        void setTimeoutSchedule(final Cancellable timeoutSchedule) {
             this.timeoutSchedule = timeoutSchedule;
         }
     }
 
     static class OnShardReady extends OnShardInitialized {
-        OnShardReady(Runnable replyRunnable) {
+        OnShardReady(final Runnable replyRunnable) {
             super(replyRunnable);
         }
     }
@@ -1923,8 +1924,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
          * @param persistenceId The persistenceId for the ShardManager
          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
          */
-        protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
-                ActorRef shardManagerActor) {
+        protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
+                final String persistenceId, final ActorRef shardManagerActor) {
             this.targetActor = Preconditions.checkNotNull(targetActor);
             this.shardName = Preconditions.checkNotNull(shardName);
             this.persistenceId = Preconditions.checkNotNull(persistenceId);
@@ -1940,14 +1941,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         @Override
-        public void onFailure(Throwable failure) {
+        public void onFailure(final Throwable failure) {
             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
             targetActor.tell(new Status.Failure(new RuntimeException(
                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
         }
 
         @Override
-        public void onUnknownResponse(Object response) {
+        public void onUnknownResponse(final Object response) {
             String msg = String.format("Failed to find leader for shard %s: received response: %s",
                     shardName, response);
             LOG.debug("{}: {}", persistenceId, msg);
@@ -1964,7 +1965,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Object response;
         private final String leaderPath;
 
-        WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+        WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
             this.shardId = shardId;
             this.response = response;
             this.leaderPath = leaderPath;
@@ -1988,7 +1989,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final ShardInformation shardInfo;
         private final OnShardInitialized onShardInitialized;
 
-        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+        ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
+            final ActorRef sender) {
             this.sender = sender;
             this.shardInfo = shardInfo;
             this.onShardInitialized = onShardInitialized;
index 4e29d4d..05d9943 100644 (file)
@@ -125,7 +125,8 @@ public abstract class AbstractShardTest extends AbstractActorTest {
     }
 
     protected Shard.Builder newShardBuilder() {
-        return Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).schemaContext(SCHEMA_CONTEXT);
+        return Shard.builder().id(shardID).datastoreContext(newDatastoreContext())
+            .schemaContextProvider(() -> SCHEMA_CONTEXT);
     }
 
     protected void testRecovery(final Set<Integer> listEntryKeys) throws Exception {
index f566ba8..86a78a7 100644 (file)
@@ -2013,13 +2013,13 @@ public class ShardTest extends AbstractShardTest {
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
-                .schemaContext(SCHEMA_CONTEXT).props();
+                .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
 
         new ShardTestKit(getSystem()) {
             {
@@ -2167,14 +2167,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@@ -2286,14 +2286,14 @@ public class ShardTest extends AbstractShardTest {
                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
                                         "akka://test/user/" + leaderShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
 
                 final TestActorRef<Shard> leaderShard = actorFactory
                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
                                         "akka://test/user/" + followerShardID.toString()))
-                                .schemaContext(SCHEMA_CONTEXT).props()
+                                .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
 
                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
index 233ecb8..881429b 100644 (file)
@@ -51,7 +51,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     private ActorRef createShard() {
         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
-                .schemaContext(TestModel.createTestContext()).props());
+                .schemaContextProvider(() -> TEST_SCHEMA_CONTEXT).props());
         ShardTestKit.waitUntilLeader(shard);
         return shard;
     }
index f899710..84bc872 100644 (file)
@@ -50,6 +50,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class ShardTransactionTest extends AbstractActorTest {
 
@@ -59,6 +60,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
+    private static final SchemaContext TEST_MODEL = TestModel.createTestContext();
 
     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
 
@@ -70,7 +72,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Before
     public void setUp() {
         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
-                .schemaContext(TestModel.createTestContext()).props()
+                .schemaContextProvider(() -> TEST_MODEL).props()
                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
         ShardTestKit.waitUntilLeader(shard);
         store = shard.underlyingActor().getDataStore();
index c61bbc1..cfef02c 100644 (file)
@@ -1223,25 +1223,26 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
     }
 
-    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
+    private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
+            final String memberName) {
         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
     }
 
-    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
-                                EntityOwnerSelectionStrategyConfig config) {
+    private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
+                                final EntityOwnerSelectionStrategyConfig config) {
         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
                     .withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
-    private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
-            String memberName) {
+    private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
+            final String memberName) {
         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
-                dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
+                dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
     }
 
-    private Map<String, String> peerMap(String... peerIds) {
+    private Map<String, String> peerMap(final String... peerIds) {
         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
         for (String peerId: peerIds) {
             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
@@ -1254,14 +1255,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
 
-        TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+        TestEntityOwnershipShard(final Builder builder, final TestActorRef<MessageCollectorActor> collectorActor) {
             super(builder);
             this.collectorActor = collectorActor;
         }
 
         @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
-        public void handleCommand(Object message) {
+        public void handleCommand(final Object message) {
             Predicate drop = dropMessagesOfType.get(message.getClass());
             if (drop == null || !drop.test(message)) {
                 super.handleCommand(message);
@@ -1272,15 +1273,15 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             }
         }
 
-        void startDroppingMessagesOfType(Class<?> msgClass) {
+        void startDroppingMessagesOfType(final Class<?> msgClass) {
             dropMessagesOfType.put(msgClass, msg -> true);
         }
 
-        <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
+        <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
             dropMessagesOfType.put(msgClass, filter);
         }
 
-        void stopDroppingMessagesOfType(Class<?> msgClass) {
+        void stopDroppingMessagesOfType(final Class<?> msgClass) {
             dropMessagesOfType.remove(msgClass);
         }
 
@@ -1288,11 +1289,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             return collectorActor;
         }
 
-        static Props props(Builder builder) {
+        static Props props(final Builder builder) {
             return props(builder, null);
         }
 
-        static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+        static Props props(final Builder builder, final TestActorRef<MessageCollectorActor> collectorActor) {
             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
                     .withDispatcher(Dispatchers.DefaultDispatcherId());
         }
index f40374b..b3d6628 100644 (file)
@@ -134,11 +134,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
-    private ActorSystem newActorSystem(String config) {
+    private ActorSystem newActorSystem(final String config) {
         return newActorSystem("cluster-test", config);
     }
 
-    private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+    private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
         if (system == getSystem()) {
             return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
@@ -151,7 +151,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         return newShardMgrProps(new MockConfiguration());
     }
 
-    private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
+    private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
@@ -162,7 +162,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
     }
 
-    private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
+    private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
                 .distributedDataStore(mock(DistributedDataStore.class));
     }
@@ -173,7 +173,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 Dispatchers.DefaultDispatcherId());
     }
 
-    private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
+    private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
                 .withDispatcher(Dispatchers.DefaultDispatcherId());
     }
@@ -183,14 +183,15 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         return newTestShardManager(newShardMgrProps());
     }
 
-    private TestShardManager newTestShardManager(Props props) {
+    private TestShardManager newTestShardManager(final Props props) {
         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
         TestShardManager shardManager = shardManagerActor.underlyingActor();
         shardManager.waitForRecoveryComplete();
         return shardManager;
     }
 
-    private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
+    private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
+            final JavaTestKit kit) {
         AssertionError last = null;
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
@@ -209,7 +210,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     }
 
     @SuppressWarnings("unchecked")
-    private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
+    private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final JavaTestKit kit, final String msg) {
         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
         if (reply instanceof Failure) {
             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
@@ -234,12 +235,12 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         final MockConfiguration mockConfig = new MockConfiguration() {
             @Override
-            public Collection<String> getMemberShardNames(MemberName memberName) {
+            public Collection<String> getMemberShardNames(final MemberName memberName) {
                 return Arrays.asList("default", "topology");
             }
 
             @Override
-            public Collection<MemberName> getMembersFromShardName(String shardName) {
+            public Collection<MemberName> getMembersFromShardName(final String shardName) {
                 return members("member-1");
             }
         };
@@ -257,12 +258,12 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
         class LocalShardManager extends ShardManager {
-            LocalShardManager(AbstractShardManagerCreator<?> creator) {
+            LocalShardManager(final AbstractShardManagerCreator<?> creator) {
                 super(creator);
             }
 
             @Override
-            protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            protected ActorRef newShardActor(final ShardInformation info) {
                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
                 ActorRef ref = null;
                 if (entry != null) {
@@ -1120,7 +1121,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
             @Override
-            public List<String> getMemberShardNames(MemberName memberName) {
+            public List<String> getMemberShardNames(final MemberName memberName) {
                 return Arrays.asList("default", "astronauts");
             }
         }));
@@ -1180,7 +1181,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         };
     }
 
-    private static List<MemberName> members(String... names) {
+    private static List<MemberName> members(final String... names) {
         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
     }
 
@@ -2091,14 +2092,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private volatile MessageInterceptor messageInterceptor;
 
-        private TestShardManager(Builder builder) {
+        private TestShardManager(final Builder builder) {
             super(builder);
             shardActor = builder.shardActor;
             shardActors = builder.shardActors;
         }
 
         @Override
-        protected void handleRecover(Object message) throws Exception {
+        protected void handleRecover(final Object message) throws Exception {
             try {
                 super.handleRecover(message);
             } finally {
@@ -2108,14 +2109,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             }
         }
 
-        private void countDownIfOther(final Member member, CountDownLatch latch) {
+        private void countDownIfOther(final Member member, final CountDownLatch latch) {
             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
                 latch.countDown();
             }
         }
 
         @Override
-        public void handleCommand(Object message) throws Exception {
+        public void handleCommand(final Object message) throws Exception {
             try {
                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
                     getSender().tell(messageInterceptor.apply(message), getSelf());
@@ -2137,7 +2138,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             }
         }
 
-        void setMessageInterceptor(MessageInterceptor messageInterceptor) {
+        void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
             this.messageInterceptor = messageInterceptor;
         }
 
@@ -2177,7 +2178,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             findPrimaryMessageReceived = new CountDownLatch(1);
         }
 
-        public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
+        public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
             return new Builder(datastoreContextBuilder);
         }
 
@@ -2185,37 +2186,37 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             private ActorRef shardActor;
             private final Map<String, ActorRef> shardActors = new HashMap<>();
 
-            Builder(DatastoreContext.Builder datastoreContextBuilder) {
+            Builder(final DatastoreContext.Builder datastoreContextBuilder) {
                 super(TestShardManager.class);
                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
             }
 
-            Builder shardActor(ActorRef newShardActor) {
+            Builder shardActor(final ActorRef newShardActor) {
                 this.shardActor = newShardActor;
                 return this;
             }
 
-            Builder addShardActor(String shardName, ActorRef actorRef) {
+            Builder addShardActor(final String shardName, final ActorRef actorRef) {
                 shardActors.put(shardName, actorRef);
                 return this;
             }
         }
 
         @Override
-        public void saveSnapshot(Object obj) {
+        public void saveSnapshot(final Object obj) {
             snapshot = (ShardManagerSnapshot) obj;
             snapshotPersist.countDown();
             super.saveSnapshot(obj);
         }
 
-        void verifySnapshotPersisted(Set<String> shardList) {
+        void verifySnapshotPersisted(final Set<String> shardList) {
             assertEquals("saveSnapshot invoked", true,
                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
         }
 
         @Override
-        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+        protected ActorRef newShardActor(final ShardInformation info) {
             if (shardActors.get(info.getShardName()) != null) {
                 return shardActors.get(info.getShardName());
             }
@@ -2224,7 +2225,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 return shardActor;
             }
 
-            return super.newShardActor(schemaContext, info);
+            return super.newShardActor(info);
         }
     }
 
@@ -2232,7 +2233,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                                                      extends AbstractShardManagerCreator<T> {
         private final Class<C> shardManagerClass;
 
-        AbstractGenericCreator(Class<C> shardManagerClass) {
+        AbstractGenericCreator(final Class<C> shardManagerClass) {
             this.shardManagerClass = shardManagerClass;
             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
@@ -2246,7 +2247,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     }
 
     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
-        GenericCreator(Class<C> shardManagerClass) {
+        GenericCreator(final Class<C> shardManagerClass) {
             super(shardManagerClass);
         }
     }
@@ -2255,7 +2256,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         private static final long serialVersionUID = 1L;
         private final Creator<ShardManager> delegate;
 
-        DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+        DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
             this.delegate = delegate;
         }
 
@@ -2272,12 +2273,12 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
         return new MessageInterceptor() {
             @Override
-            public Object apply(Object message) {
+            public Object apply(final Object message) {
                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
             }
 
             @Override
-            public boolean canIntercept(Object message) {
+            public boolean canIntercept(final Object message) {
                 return message instanceof FindPrimary;
             }
         };
@@ -2290,13 +2291,13 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         private final Class<?> requestClass;
 
         @SuppressWarnings("unused")
-        MockRespondActor(Class<?> requestClass, Object responseMsg) {
+        MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
             this.requestClass = requestClass;
             this.responseMsg = responseMsg;
         }
 
         @Override
-        public void onReceive(Object message) throws Exception {
+        public void onReceive(final Object message) throws Exception {
             if (message.equals(CLEAR_RESPONSE)) {
                 responseMsg = null;
             } else {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.