+ public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
+ TestShardManager shardManager = newTestShardManager();
+
+ String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+ RaftState.Candidate.name(), RaftState.Follower.name()));
+
+ // Initially will be false
+ assertEquals(false, shardManager.getMBean().getSyncStatus());
+
+ // Send status true will make sync status true
+ shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
+
+ assertEquals(true, shardManager.getMBean().getSyncStatus());
+
+ // Send status false will make sync status false
+ shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
+
+ assertEquals(false, shardManager.getMBean().getSyncStatus());
+
+ }
+
+ @Test
+ public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
+ LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
+ TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
+ @Override
+ public List<String> getMemberShardNames(String memberName) {
+ return Arrays.asList("default", "astronauts");
+ }
+ }));
+
+ // Initially will be false
+ assertEquals(false, shardManager.getMBean().getSyncStatus());
+
+ // Make default shard leader
+ String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
+ RaftState.Follower.name(), RaftState.Leader.name()));
+
+ // default = Leader, astronauts is unknown so sync status remains false
+ assertEquals(false, shardManager.getMBean().getSyncStatus());
+
+ // Make astronauts shard leader as well
+ String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
+ shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ RaftState.Follower.name(), RaftState.Leader.name()));
+
+ // Now sync status should be true
+ assertEquals(true, shardManager.getMBean().getSyncStatus());
+
+ // Make astronauts a Follower
+ shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ RaftState.Leader.name(), RaftState.Follower.name()));
+
+ // Sync status is not true
+ assertEquals(false, shardManager.getMBean().getSyncStatus());
+
+ // Make the astronauts follower sync status true
+ shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
+
+ // Sync status is now true
+ assertEquals(true, shardManager.getMBean().getSyncStatus());
+
+ LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
+ }
+
+ @Test
+ public void testOnReceiveSwitchShardBehavior() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
+
+ SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
+
+ assertEquals(RaftState.Leader, switchBehavior.getNewState());
+ assertEquals(1000, switchBehavior.getNewTerm());
+ }};
+ }
+
+ @Test
+ public void testOnCreateShard() {
+ LOG.info("testOnCreateShard starting");
+ new JavaTestKit(getSystem()) {{
+ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ SchemaContext schemaContext = TestModel.createTestContext();
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+
+ DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
+ persistent(false).build();
+ Shard.Builder shardBuilder = Shard.builder();
+
+ ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
+ shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
+
+ expectMsgClass(duration("5 seconds"), Success.class);
+
+ shardManager.tell(new FindLocalShard("foo", true), getRef());
+
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
+ assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
+ getPeerAddressResolver() instanceof ShardPeerAddressResolver);
+ assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
+ new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
+ shardBuilder.getPeerAddresses().keySet());
+ assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
+ shardBuilder.getId());
+ assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
+
+ // Send CreateShard with same name - should return Success with a message.
+
+ shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+
+ Success success = expectMsgClass(duration("5 seconds"), Success.class);
+ assertNotNull("Success status is null", success.status());
+ }};
+
+ LOG.info("testOnCreateShard ending");
+ }
+
+ @Test
+ public void testOnCreateShardWithLocalMemberNotInShardConfig() {
+ LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
+ new JavaTestKit(getSystem()) {{
+ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
+
+ Shard.Builder shardBuilder = Shard.builder();
+ ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ "foo", null, Arrays.asList("member-5", "member-6"));
+
+ shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+ expectMsgClass(duration("5 seconds"), Success.class);
+
+ shardManager.tell(new FindLocalShard("foo", true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
+ assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
+ shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
+ }};
+
+ LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
+ }
+
+ @Test
+ public void testOnCreateShardWithNoInitialSchemaContext() {
+ LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ Shard.Builder shardBuilder = Shard.builder();
+
+ ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ "foo", null, Arrays.asList("member-1"));
+ shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+
+ expectMsgClass(duration("5 seconds"), Success.class);
+
+ SchemaContext schemaContext = TestModel.createTestContext();
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+
+ shardManager.tell(new FindLocalShard("foo", true), getRef());
+
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
+ assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
+ }};
+
+ LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
+ }
+
+ @Test
+ public void testGetSnapshot() throws Throwable {
+ LOG.info("testGetSnapshot starting");
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("shard1", Arrays.asList("member-1")).
+ put("shard2", Arrays.asList("member-1")).
+ put("astronauts", Collections.<String>emptyList()).build());
+
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
+ Failure failure = kit.expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
+
+ waitForShardInitialized(shardManager, "shard1", kit);
+ waitForShardInitialized(shardManager, "shard2", kit);
+
+ shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
+
+ DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
+
+ assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
+ assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
+
+ Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
+ @Override
+ public String apply(ShardSnapshot s) {
+ return s.getName();