+ /**
+ * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local
+ * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring
+ * an AddShardReplica request to join it to an existing leader.
+ */
+ @Test
+ public void testEntityOwnershipShardBootstrapping() throws Throwable {
+ String name = "testEntityOwnershipShardBootstrapping";
+ String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG;
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ DistributedDataStore follower1DistributedDataStore = follower1Node.configDataStore();
+ follower1DistributedDataStore.waitTillReady();
+
+ leaderNode.waitForMembersUp("member-2");
+ follower1Node.waitForMembersUp("member-1");
+
+ EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore);
+
+ leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
+
+ // Register a candidate for follower1 - should get queued since follower1 has no leader
+ EntityOwnershipCandidateRegistration candidateReg = follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1));
+
+ // Add replica in follower1
+ AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
+ follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1Node.kit().getRef());
+ Object reply = follower1Node.kit().expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class);
+ if(reply instanceof Failure) {
+ throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause());
+ }
+
+ // The queued candidate registration should proceed
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+ reset(leaderMockListener);
+
+ candidateReg.close();
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
+ reset(leaderMockListener);
+
+ // Restart follower1 and verify the entity ownership shard is re-instated by registering.
+ Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
+ follower1Node.cleanup();
+
+ follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+ follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+
+ verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
+ assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
+ assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
+ org.hamcrest.CoreMatchers.containsString("member-1"));
+ }
+ });