+ private TestShardManager newTestShardManager() {
+ return newTestShardManager(newShardMgrProps());
+ }
+
+ private TestShardManager newTestShardManager(Props props) {
+ TestActorRef<TestShardManager> shardManagerActor = TestActorRef.create(getSystem(), props);
+ TestShardManager shardManager = shardManagerActor.underlyingActor();
+ shardManager.waitForRecoveryComplete();
+ return shardManager;
+ }
+
+ @Test
+ public void testPerShardDatastoreContext() throws Exception {
+ final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
+ datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
+
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
+
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
+
+ final MockConfiguration mockConfig = new MockConfiguration() {
+ @Override
+ public Collection<String> getMemberShardNames(String memberName) {
+ return Arrays.asList("default", "topology");
+ }
+
+ @Override
+ public Collection<String> getMembersFromShardName(String shardName) {
+ return Arrays.asList("member-1");
+ }
+ };
+
+ final TestActorRef<MessageCollectorActor> defaultShardActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class), "default");
+ final TestActorRef<MessageCollectorActor> topologyShardActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class), "topology");
+
+ final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
+ new HashMap<String, Entry<ActorRef, DatastoreContext>>());
+ shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
+ shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
+
+ final CountDownLatch newShardActorLatch = new CountDownLatch(2);
+ final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig).
+ datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready).
+ primaryShardInfoCache(primaryShardInfoCache)) {
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
+ ActorRef ref = null;
+ if(entry != null) {
+ ref = entry.getKey();
+ entry.setValue(info.getDatastoreContext());
+ }
+
+ newShardActorLatch.countDown();
+ return ref;
+ }
+ };
+ }
+ };
+
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)).
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
+
+ assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
+ assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
+ getShardElectionTimeoutFactor());
+ assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
+ getShardElectionTimeoutFactor());
+
+ DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
+ datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
+
+ Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
+ shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
+
+ shardManager.tell(newMockFactory, kit.getRef());
+
+ DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
+ assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
+
+ newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
+ assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
+
+ defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+