2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.admin;
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
36 import java.io.FileInputStream;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
46 import java.util.Map.Entry;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
69 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
77 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
78 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
99 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
100 import org.opendaylight.yangtools.yang.common.RpcError;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 * Unit tests for ClusterAdminRpcService.
107 * @author Thomas Pantelis
109 public class ClusterAdminRpcServiceTest {
110 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
111 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
112 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
113 private final List<MemberNode> memberNodes = new ArrayList<>();
116 public void setUp() {
117 InMemoryJournal.clear();
118 InMemorySnapshotStore.clear();
122 public void tearDown() {
123 for (MemberNode m : Lists.reverse(memberNodes)) {
130 public void testBackupDatastore() throws Exception {
131 MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
132 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
133 .testName("testBackupDatastore").build();
135 String fileName = "target/testBackupDatastore";
136 new File(fileName).delete();
138 ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
140 RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
141 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
142 verifySuccessfulRpcResult(rpcResult);
144 try (FileInputStream fis = new FileInputStream(fileName)) {
145 List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
146 assertEquals("DatastoreSnapshot size", 2, snapshots.size());
148 ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
149 snapshots.get(1).getType(), snapshots.get(1));
150 verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
151 map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
153 new File(fileName).delete();
156 // Test failure by killing a shard.
158 node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
159 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
161 ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
162 node.kit().watch(carsShardActor);
163 carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164 node.kit().expectTerminated(carsShardActor);
166 rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
167 .get(5, TimeUnit.SECONDS);
168 assertFalse("isSuccessful", rpcResult.isSuccessful());
169 assertEquals("getErrors", 1, rpcResult.getErrors().size());
172 private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
173 String... expShardNames) {
174 assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
175 Set<String> shardNames = new HashSet<>();
176 for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
177 shardNames.add(s.getName());
180 assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
184 public void testAddRemovePrefixShardReplica() throws Exception {
185 String name = "testAddPrefixShardReplica";
186 String moduleShardsConfig = "module-shards-default.conf";
188 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
189 .moduleShardsConfig(moduleShardsConfig).build();
190 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
191 .moduleShardsConfig(moduleShardsConfig).build();
192 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
193 .moduleShardsConfig(moduleShardsConfig).build();
195 member1.waitForMembersUp("member-2", "member-3");
196 replicaNode2.kit().waitForMembersUp("member-1", "member-3");
197 replicaNode3.kit().waitForMembersUp("member-1", "member-2");
199 final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
201 shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
202 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
203 "prefix", Collections.singleton(MEMBER_1))),
204 ActorRef.noSender());
206 member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
207 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
209 final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
210 final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
211 Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
213 addPrefixShardReplica(replicaNode2, identifier, serializer,
214 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
216 addPrefixShardReplica(replicaNode3, identifier, serializer,
217 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
219 verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
220 "member-2", "member-3");
222 removePrefixShardReplica(member1, identifier, "member-3", serializer,
223 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
225 verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
226 verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
229 removePrefixShardReplica(member1, identifier, "member-2", serializer,
230 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
232 verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
236 public void testAddShardReplica() throws Exception {
237 String name = "testAddShardReplica";
238 String moduleShardsConfig = "module-shards-cars-member-1.conf";
239 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
240 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
242 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
243 .moduleShardsConfig(moduleShardsConfig).build();
245 leaderNode1.waitForMembersUp("member-2");
247 doAddShardReplica(newReplicaNode2, "cars", "member-1");
249 MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
250 .moduleShardsConfig(moduleShardsConfig).build();
252 leaderNode1.waitForMembersUp("member-3");
253 newReplicaNode2.waitForMembersUp("member-3");
255 doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
257 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
258 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
260 // Write data to member-2's config datastore and read/verify via member-3
261 final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
262 newReplicaNode3.configDataStore());
264 // Write data to member-3's oper datastore and read/verify via member-2
265 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
267 // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
268 // 2 ServerConfigurationPayload entries and the transaction payload entry.
270 RaftStateVerifier verifier = raftState -> {
271 assertEquals("Commit index", 2, raftState.getCommitIndex());
272 assertEquals("Last applied index", 2, raftState.getLastApplied());
275 verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
276 verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
278 verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
279 verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
281 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
282 verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
284 // Restart member-3 and verify the cars config shard is re-instated.
286 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
287 newReplicaNode3.cleanup();
289 newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
290 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
292 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
293 readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
297 public void testAddShardReplicaFailures() throws Exception {
298 String name = "testAddShardReplicaFailures";
299 MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
300 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
302 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
303 memberNode.operDataStore(), null);
305 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
306 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
307 verifyFailedRpcResult(rpcResult);
309 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
310 .build()).get(10, TimeUnit.SECONDS);
311 verifyFailedRpcResult(rpcResult);
313 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
314 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
315 verifyFailedRpcResult(rpcResult);
318 private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
319 AbstractDataStore readFromStore) throws Exception {
320 DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
321 NormalizedNode<?, ?> carsNode = CarsModel.create();
322 writeTx.write(CarsModel.BASE_PATH, carsNode);
324 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
325 Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
326 assertEquals("canCommit", TRUE, canCommit);
327 cohort.preCommit().get(5, TimeUnit.SECONDS);
328 cohort.commit().get(5, TimeUnit.SECONDS);
330 readCarsNodeAndVerify(readFromStore, carsNode);
334 private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
335 NormalizedNode<?, ?> expCarsNode) throws Exception {
336 Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
337 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
338 assertTrue("isPresent", optional.isPresent());
339 assertEquals("Data node", expCarsNode, optional.get());
342 private void addPrefixShardReplica(final MemberNode memberNode,
343 final InstanceIdentifier<?> identifier,
344 final BindingNormalizedNodeSerializer serializer,
345 final String shardName,
346 final String... peerMemberNames) throws Exception {
348 final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
349 .setShardPrefix(identifier)
350 .setDataStoreType(DataStoreType.Config).build();
352 final ClusterAdminRpcService service =
353 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
355 final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
356 verifySuccessfulRpcResult(rpcResult);
358 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
359 Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
360 assertTrue("Replica shard not present", optional.isPresent());
363 private void removePrefixShardReplica(final MemberNode memberNode,
364 final InstanceIdentifier<?> identifier,
365 final String removeFromMember,
366 final BindingNormalizedNodeSerializer serializer,
367 final String shardName,
368 final String... peerMemberNames) throws Exception {
369 final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
370 .setDataStoreType(DataStoreType.Config)
371 .setShardPrefix(identifier)
372 .setMemberName(removeFromMember).build();
374 final ClusterAdminRpcService service =
375 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
377 final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
378 verifySuccessfulRpcResult(rpcResult);
380 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
383 private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
385 memberNode.waitForMembersUp(peerMemberNames);
387 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
388 memberNode.operDataStore(), null);
390 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
391 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
392 verifySuccessfulRpcResult(rpcResult);
394 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
396 Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
397 assertFalse("Oper shard present", optional.isPresent());
399 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
400 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
401 verifySuccessfulRpcResult(rpcResult);
403 verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
406 private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
407 if (!rpcResult.isSuccessful()) {
408 if (rpcResult.getErrors().size() > 0) {
409 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
410 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
413 fail("Rpc failed with no error");
416 return rpcResult.getResult();
419 private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
420 assertFalse("RpcResult", rpcResult.isSuccessful());
421 assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
422 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
423 assertNotNull("RpcResult error message null", error.getMessage());
427 public void testRemoveShardReplica() throws Exception {
428 String name = "testRemoveShardReplica";
429 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
430 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
431 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
432 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
435 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
436 .moduleShardsConfig(moduleShardsConfig).build();
438 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
439 .moduleShardsConfig(moduleShardsConfig).build();
441 leaderNode1.configDataStore().waitTillReady();
442 replicaNode3.configDataStore().waitTillReady();
443 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
444 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
445 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
447 // Invoke RPC service on member-3 to remove it's local shard
449 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
450 replicaNode3.operDataStore(), null);
452 RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
453 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
454 .get(10, TimeUnit.SECONDS);
455 verifySuccessfulRpcResult(rpcResult);
457 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
458 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
459 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
461 // Restart member-2 and verify member-3 isn't present.
463 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
464 replicaNode2.cleanup();
466 MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
467 .moduleShardsConfig(moduleShardsConfig).build();
469 newPeplicaNode2.configDataStore().waitTillReady();
470 verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
472 // Invoke RPC service on member-1 to remove member-2
474 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
475 leaderNode1.operDataStore(), null);
477 rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
478 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
479 verifySuccessfulRpcResult(rpcResult);
481 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
482 verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
486 public void testRemoveShardLeaderReplica() throws Exception {
487 String name = "testRemoveShardLeaderReplica";
488 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
489 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
490 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
491 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
494 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
495 .moduleShardsConfig(moduleShardsConfig).build();
497 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
498 .moduleShardsConfig(moduleShardsConfig).build();
500 leaderNode1.configDataStore().waitTillReady();
501 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
502 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
503 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
505 replicaNode2.waitForMembersUp("member-1", "member-3");
506 replicaNode3.waitForMembersUp("member-1", "member-2");
508 // Invoke RPC service on leader member-1 to remove it's local shard
510 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
511 leaderNode1.operDataStore(), null);
513 RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
514 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
515 .get(10, TimeUnit.SECONDS);
516 verifySuccessfulRpcResult(rpcResult);
518 verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
519 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
520 containsString("member-3"))));
522 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
523 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
524 verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
528 public void testAddReplicasForAllShards() throws Exception {
529 String name = "testAddReplicasForAllShards";
530 String moduleShardsConfig = "module-shards-member1.conf";
531 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
532 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
534 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
535 "pets", null, Arrays.asList(MEMBER_1));
536 leaderNode1.configDataStore().getActorContext().getShardManager().tell(
537 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
538 leaderNode1.kit().expectMsgClass(Success.class);
539 leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
541 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
542 .moduleShardsConfig(moduleShardsConfig).build();
544 leaderNode1.waitForMembersUp("member-2");
545 newReplicaNode2.waitForMembersUp("member-1");
547 newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
548 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
549 newReplicaNode2.kit().expectMsgClass(Success.class);
551 newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
552 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
553 "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
554 newReplicaNode2.kit().getRef());
555 newReplicaNode2.kit().expectMsgClass(Success.class);
557 ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
558 newReplicaNode2.operDataStore(), null);
560 RpcResult<AddReplicasForAllShardsOutput> rpcResult =
561 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
562 AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
563 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
564 successShardResult("people", DataStoreType.Config),
565 successShardResult("pets", DataStoreType.Config),
566 successShardResult("cars", DataStoreType.Operational),
567 successShardResult("people", DataStoreType.Operational),
568 failedShardResult("no-leader", DataStoreType.Operational));
570 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
571 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
572 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
573 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
574 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
578 public void testRemoveAllShardReplicas() throws Exception {
579 String name = "testRemoveAllShardReplicas";
580 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
581 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
582 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
583 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
586 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
587 .moduleShardsConfig(moduleShardsConfig).build();
589 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
590 .moduleShardsConfig(moduleShardsConfig).build();
592 leaderNode1.configDataStore().waitTillReady();
593 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
594 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
595 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
597 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
598 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
599 leaderNode1.configDataStore().getActorContext().getShardManager().tell(
600 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
601 leaderNode1.kit().expectMsgClass(Success.class);
603 replicaNode2.configDataStore().getActorContext().getShardManager().tell(
604 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
605 replicaNode2.kit().expectMsgClass(Success.class);
607 replicaNode3.configDataStore().getActorContext().getShardManager().tell(
608 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
609 replicaNode3.kit().expectMsgClass(Success.class);
611 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
612 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
613 verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
615 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
616 replicaNode3.operDataStore(), null);
618 RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
619 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
620 RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
621 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
622 successShardResult("people", DataStoreType.Config),
623 successShardResult("pets", DataStoreType.Config),
624 successShardResult("cars", DataStoreType.Operational),
625 successShardResult("people", DataStoreType.Operational));
627 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
628 verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
629 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
630 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
631 verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
632 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
633 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
634 verifyNoShardPresent(replicaNode3.configDataStore(), "people");
635 verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
639 public void testChangeMemberVotingStatesForShard() throws Exception {
640 String name = "testChangeMemberVotingStatusForShard";
641 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
642 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
643 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
644 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
647 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
648 .moduleShardsConfig(moduleShardsConfig).build();
650 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
651 .moduleShardsConfig(moduleShardsConfig).build();
653 leaderNode1.configDataStore().waitTillReady();
654 replicaNode3.configDataStore().waitTillReady();
655 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
656 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
657 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
659 // Invoke RPC service on member-3 to change voting status
661 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
662 replicaNode3.operDataStore(), null);
664 RpcResult<Void> rpcResult = service3
665 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
666 .setShardName("cars").setDataStoreType(DataStoreType.Config)
667 .setMemberVotingState(ImmutableList.of(
668 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
669 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
671 .get(10, TimeUnit.SECONDS);
672 verifySuccessfulRpcResult(rpcResult);
674 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
675 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
676 verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
677 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
678 verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
679 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
683 public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
684 String name = "testChangeMemberVotingStatesForSingleNodeShard";
685 String moduleShardsConfig = "module-shards-member1.conf";
686 MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
687 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
688 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
691 leaderNode.configDataStore().waitTillReady();
693 // Invoke RPC service on member-3 to change voting status
695 ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
696 leaderNode.operDataStore(), null);
698 RpcResult<Void> rpcResult = service
699 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
700 .setShardName("cars").setDataStoreType(DataStoreType.Config)
701 .setMemberVotingState(ImmutableList
702 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
704 .get(10, TimeUnit.SECONDS);
705 verifyFailedRpcResult(rpcResult);
707 verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
711 public void testChangeMemberVotingStatesForAllShards() throws Exception {
712 String name = "testChangeMemberVotingStatesForAllShards";
713 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
714 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
715 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
716 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
719 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
720 .moduleShardsConfig(moduleShardsConfig).build();
722 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
723 .moduleShardsConfig(moduleShardsConfig).build();
725 leaderNode1.configDataStore().waitTillReady();
726 leaderNode1.operDataStore().waitTillReady();
727 replicaNode3.configDataStore().waitTillReady();
728 replicaNode3.operDataStore().waitTillReady();
729 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
730 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
731 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
733 // Invoke RPC service on member-3 to change voting status
735 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
736 replicaNode3.operDataStore(), null);
738 RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
739 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
740 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
741 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
742 .get(10, TimeUnit.SECONDS);
743 ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
744 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
745 successShardResult("people", DataStoreType.Config),
746 successShardResult("cars", DataStoreType.Operational),
747 successShardResult("people", DataStoreType.Operational));
749 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
750 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
751 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
752 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
753 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
757 public void testFlipMemberVotingStates() throws Exception {
758 String name = "testFlipMemberVotingStates";
760 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
761 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
762 new ServerInfo("member-3", false)));
764 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
765 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
766 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
768 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
769 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
770 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
771 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
774 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
775 .moduleShardsConfig(moduleShardsConfig).build();
777 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
778 .moduleShardsConfig(moduleShardsConfig).build();
780 leaderNode1.configDataStore().waitTillReady();
781 leaderNode1.operDataStore().waitTillReady();
782 replicaNode3.configDataStore().waitTillReady();
783 replicaNode3.operDataStore().waitTillReady();
784 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
785 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
787 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
788 replicaNode3.operDataStore(), null);
790 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
791 .get(10, TimeUnit.SECONDS);
792 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
793 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
794 successShardResult("people", DataStoreType.Config),
795 successShardResult("cars", DataStoreType.Operational),
796 successShardResult("people", DataStoreType.Operational));
798 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
799 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
800 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
801 new String[]{"cars", "people"},
802 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
803 new SimpleEntry<>("member-3", TRUE));
805 // Leadership should have transferred to member 3 since it is the only remaining voting member.
806 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
807 assertNotNull("Expected non-null leader Id", raftState.getLeader());
808 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
809 raftState.getLeader().contains("member-3"));
812 verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
813 assertNotNull("Expected non-null leader Id", raftState.getLeader());
814 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
815 raftState.getLeader().contains("member-3"));
818 // Flip the voting states back to the original states.
820 rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
821 result = verifySuccessfulRpcResult(rpcResult);
822 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
823 successShardResult("people", DataStoreType.Config),
824 successShardResult("cars", DataStoreType.Operational),
825 successShardResult("people", DataStoreType.Operational));
827 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
828 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
829 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
830 new String[]{"cars", "people"},
831 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
832 new SimpleEntry<>("member-3", FALSE));
834 // Leadership should have transferred to member 1 or 2.
835 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
836 assertNotNull("Expected non-null leader Id", raftState.getLeader());
837 assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
838 raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
843 public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
844 String name = "testFlipMemberVotingStatesWithNoInitialLeader";
846 // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
847 // non-voting and simulated as down by not starting them up.
848 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
849 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
850 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
851 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
853 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
854 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
855 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
857 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
858 final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
859 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
860 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
863 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
864 .moduleShardsConfig(moduleShardsConfig).build();
866 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
867 .moduleShardsConfig(moduleShardsConfig).build();
869 // Initially there won't be a leader b/c all the up nodes are non-voting.
871 replicaNode1.waitForMembersUp("member-2", "member-3");
873 verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
874 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
875 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
876 new SimpleEntry<>("member-6", TRUE));
878 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
879 assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
881 ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
882 replicaNode1.operDataStore(), null);
884 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
885 .get(10, TimeUnit.SECONDS);
886 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
887 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
888 successShardResult("people", DataStoreType.Config),
889 successShardResult("cars", DataStoreType.Operational),
890 successShardResult("people", DataStoreType.Operational));
892 verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
893 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
894 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
895 new String[]{"cars", "people"},
896 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
897 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
898 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
900 // Since member 1 was changed to voting and there was no leader, it should've started and election
902 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
903 assertNotNull("Expected non-null leader Id", raftState.getLeader());
904 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
905 raftState.getLeader().contains("member-1"));
908 verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
909 assertNotNull("Expected non-null leader Id", raftState.getLeader());
910 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
911 raftState.getLeader().contains("member-1"));
916 public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
917 String name = "testFlipMemberVotingStatesWithVotingMembersDown";
919 // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
920 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
921 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
922 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
923 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
925 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
926 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
927 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
929 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
930 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
931 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
932 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
935 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
936 .moduleShardsConfig(moduleShardsConfig).build();
938 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
939 .moduleShardsConfig(moduleShardsConfig).build();
941 leaderNode1.configDataStore().waitTillReady();
942 leaderNode1.operDataStore().waitTillReady();
943 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
944 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
945 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
946 new SimpleEntry<>("member-6", FALSE));
948 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
949 leaderNode1.operDataStore(), null);
951 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
952 .get(10, TimeUnit.SECONDS);
953 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
954 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
955 successShardResult("people", DataStoreType.Config),
956 successShardResult("cars", DataStoreType.Operational),
957 successShardResult("people", DataStoreType.Operational));
959 // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
960 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
961 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
962 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
963 new String[]{"cars", "people"},
964 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
965 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
966 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
968 // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
969 // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
970 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
971 assertNotNull("Expected non-null leader Id", raftState.getLeader());
972 assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
976 private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
977 String member, String datastoreTypeSuffix, String... shards) {
978 String[] datastoreTypes = {"config_", "oper_"};
979 for (String type : datastoreTypes) {
980 for (String shard : shards) {
981 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
982 for (ServerInfo info : serverConfig.getServerConfig()) {
983 newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
984 type + datastoreTypeSuffix).toString(), info.isVoting()));
987 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
988 type + datastoreTypeSuffix).toString();
989 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
990 InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
991 new ServerConfigurationPayload(newServerInfo)));
997 private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
998 SimpleEntry<String, Boolean>... expStates) throws Exception {
999 for (AbstractDataStore datastore: datastores) {
1000 for (String shard: shards) {
1001 verifyVotingStates(datastore, shard, expStates);
1007 private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1008 SimpleEntry<String, Boolean>... expStates) throws Exception {
1009 String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1010 Map<String, Boolean> expStateMap = new HashMap<>();
1011 for (Entry<String, Boolean> e: expStates) {
1012 expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1013 datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1016 verifyRaftState(datastore, shardName, raftState -> {
1017 String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1018 datastore.getActorContext().getDataStoreName()).toString();
1019 assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1020 for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1021 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1026 private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1027 Map<String, ShardResult> expResultsMap = new HashMap<>();
1028 for (ShardResult r: expShardResults) {
1029 expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1032 for (ShardResult result: shardResults) {
1033 ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1034 assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1035 result.getDataStoreType()), exp);
1036 assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1037 if (exp.isSucceeded()) {
1038 assertNull("Expected null error message", result.getErrorMessage());
1040 assertNotNull("Expected error message", result.getErrorMessage());
1044 if (!expResultsMap.isEmpty()) {
1045 fail("Missing shard results for " + expResultsMap.keySet());
1049 private static ShardResult successShardResult(String shardName, DataStoreType type) {
1050 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1053 private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1054 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();