+ public void testAddShardReplica() throws Exception {
+ String name = "testAddShardReplica";
+ String moduleShardsConfig = "module-shards-cars-member-1.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
+
+ MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.waitForMembersUp("member-2");
+
+ testAddShardReplica(newReplicaNode2, "cars", "member-1");
+
+ MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.waitForMembersUp("member-3");
+ newReplicaNode2.waitForMembersUp("member-3");
+
+ testAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
+
+ verifyRaftPeersPresent(newReplicaNode2.configDataStore, "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(newReplicaNode2.operDataStore, "cars", "member-1", "member-3");
+
+ // Write data to member-2's config datastore and read/verify via member-3
+ NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore,
+ newReplicaNode3.configDataStore);
+
+ // Write data to member-3's oper datastore and read/verify via member-2
+ writeCarsNodeAndVerify(newReplicaNode3.operDataStore, newReplicaNode2.operDataStore);
+
+ // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
+ // 2 ServerConfigurationPayload entries and the transaction payload entry.
+
+ RaftStateVerifier verifier = new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertEquals("Commit index", 2, raftState.getCommitIndex());
+ assertEquals("Last applied index", 2, raftState.getLastApplied());
+ }
+ };
+
+ verifyRaftState(leaderNode1.configDataStore, "cars", verifier);
+ verifyRaftState(leaderNode1.operDataStore, "cars", verifier);
+
+ verifyRaftState(newReplicaNode2.configDataStore, "cars", verifier);
+ verifyRaftState(newReplicaNode2.operDataStore, "cars", verifier);
+
+ verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier);
+ verifyRaftState(newReplicaNode3.operDataStore, "cars", verifier);
+
+ // Restart member-3 and verify the cars config shard is re-instated.
+
+ Cluster.get(leaderNode1.kit.getSystem()).down(Cluster.get(newReplicaNode3.kit.getSystem()).selfAddress());
+ newReplicaNode3.cleanup();
+
+ newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
+
+ verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier);
+ readCarsNodeAndVerify(newReplicaNode3.configDataStore, configCarsNode);
+ }
+
+ private NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
+ DistributedDataStore readFromStore) throws Exception {
+ DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
+ NormalizedNode<?, ?> carsNode = CarsModel.create();
+ writeTx.write(CarsModel.BASE_PATH, carsNode);
+
+ DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+ Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS);
+ assertEquals("canCommit", true, canCommit);
+ cohort.preCommit().get(5, TimeUnit.SECONDS);
+ cohort.commit().get(5, TimeUnit.SECONDS);
+
+ readCarsNodeAndVerify(readFromStore, carsNode);
+ return carsNode;
+ }
+
+ private void readCarsNodeAndVerify(DistributedDataStore readFromStore,
+ NormalizedNode<?, ?> expCarsNode) throws Exception {
+ Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().
+ read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", expCarsNode, optional.get());
+ }
+
+ private void testAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
+ throws Exception {
+ memberNode.waitForMembersUp(peerMemberNames);
+
+ ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore,
+ memberNode.operDataStore);
+
+ RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
+ build()).get(10, TimeUnit.SECONDS);
+ checkSuccessfulRpcResult(rpcResult);
+
+ verifyRaftPeersPresent(memberNode.configDataStore, shardName, peerMemberNames);
+ verifyRaftPeersPresent(memberNode.operDataStore, shardName, peerMemberNames);
+
+ service.close();
+ }
+
+ private void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, String... peerMemberNames)
+ throws Exception {
+ final Set<String> peerIds = Sets.newHashSet();
+ for(String p: peerMemberNames) {
+ peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
+ type(datastore.getActorContext().getDataStoreType()).build().toString());
+ }
+
+ verifyRaftState(datastore, shardName, new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName,
+ raftState.getPeerAddresses().keySet().containsAll(peerIds));
+ }
+ });
+ }
+
+ private void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
+ throws Exception {
+ ActorContext actorContext = datastore.getActorContext();
+
+ Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
+ ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
+
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
+ executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
+
+ try {
+ verifier.verify(raftState);
+ return;
+ } catch (AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
+ }
+
+ private void checkSuccessfulRpcResult(RpcResult<Void> rpcResult) {
+ if(!rpcResult.isSuccessful()) {
+ if(rpcResult.getErrors().size() > 0) {
+ RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
+ throw new AssertionError("Rpc failed with error: " + error, error.getCause());
+ }
+
+ fail("Rpc failed with no error");
+ }