+ }};
+ }
+
+ @Test
+ public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+
+ verify(ready, never()).countDown();
+
+ }};
+ }
+
+
+ @Test
+ public void testByDefaultSyncStatusIsFalse() throws Exception{
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ ShardManager shardManagerActor = shardManager.underlyingActor();
+
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+ }
+
+ @Test
+ public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ ShardManager shardManagerActor = shardManager.underlyingActor();
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ RaftState.Follower.name(), RaftState.Leader.name()));
+
+ assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+ }
+
+ @Test
+ public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ ShardManager shardManagerActor = shardManager.underlyingActor();
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ RaftState.Follower.name(), RaftState.Candidate.name()));
+
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+ // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+ }
+
+ @Test
+ public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ ShardManager shardManagerActor = shardManager.underlyingActor();
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ RaftState.Candidate.name(), RaftState.Follower.name()));
+
+ // Initially will be false
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+ // Send status true will make sync status true
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+
+ assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+ // Send status false will make sync status false
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
+
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+ }
+
+ @Test
+ public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration() {
+ @Override
+ public List<String> getMemberShardNames(String memberName) {
+ return Arrays.asList("default", "astronauts");
+ }
+ },
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ ShardManager shardManagerActor = shardManager.underlyingActor();
+
+ // Initially will be false
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+ // Make default shard leader
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ RaftState.Follower.name(), RaftState.Leader.name()));
+
+ // default = Leader, astronauts is unknown so sync status remains false
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+ // Make astronauts shard leader as well
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+ RaftState.Follower.name(), RaftState.Leader.name()));
+
+ // Now sync status should be true
+ assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+ // Make astronauts a Follower
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+ RaftState.Leader.name(), RaftState.Follower.name()));
+
+ // Sync status is not true
+ assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
+
+ // Make the astronauts follower sync status true
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
+
+ // Sync status is now true
+ assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
+
+ }
+
+ private static class TestShardManager extends ShardManager {
+ private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+ TestShardManager(String shardMrgIDSuffix) {
+ super(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
+ }
+
+ @Override
+ public void handleRecover(Object message) throws Exception {
+ try {
+ super.handleRecover(message);
+ } finally {
+ if(message instanceof RecoveryCompleted) {
+ recoveryComplete.countDown();
+ }
+ }
+ }
+
+ void waitForRecoveryComplete() {
+ assertEquals("Recovery complete", true,
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static class TestShardManagerCreator implements Creator<TestShardManager> {
+ String shardMrgIDSuffix;
+
+ TestShardManagerCreator(String shardMrgIDSuffix) {
+ this.shardMrgIDSuffix = shardMrgIDSuffix;
+ }
+
+ @Override
+ public TestShardManager create() throws Exception {
+ return new TestShardManager(shardMrgIDSuffix);
+ }
+
+ }
+
+ private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+ private final Creator<ShardManager> delegate;
+
+ public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return delegate.create();
+ }
+ }
+
+ private static class ForwardingShardManager extends ShardManager {
+ private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
+ private CountDownLatch memberUpReceived = new CountDownLatch(1);
+ private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+ private final ActorRef shardActor;
+ private final String name;
+
+ protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+ ActorRef shardActor) {
+ super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+ this.shardActor = shardActor;
+ this.name = name;
+ }
+
+ @Override
+ public void handleCommand(Object message) throws Exception {
+ try{
+ super.handleCommand(message);
+ } finally {
+ if(message instanceof FindPrimary) {
+ findPrimaryMessageReceived.countDown();
+ } else if(message instanceof ClusterEvent.MemberUp) {
+ String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberUpReceived.countDown();
+ }
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberRemovedReceived.countDown();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String persistenceId() {
+ return name;
+ }
+
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ return shardActor;
+ }
+
+ void waitForMemberUp() {
+ assertEquals("MemberUp received", true,
+ Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
+ memberUpReceived = new CountDownLatch(1);
+ }
+
+ void waitForMemberRemoved() {
+ assertEquals("MemberRemoved received", true,
+ Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
+ memberRemovedReceived = new CountDownLatch(1);
+ }
+
+ void verifyFindPrimary() {
+ assertEquals("FindPrimary received", true,
+ Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
+ findPrimaryMessageReceived = new CountDownLatch(1);
+ }
+ }