2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.admin;
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
36 import java.io.FileInputStream;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
46 import java.util.Map.Entry;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
69 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
77 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
78 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
100 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
101 import org.opendaylight.yangtools.yang.common.RpcError;
102 import org.opendaylight.yangtools.yang.common.RpcResult;
103 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 * Unit tests for ClusterAdminRpcService.
108 * @author Thomas Pantelis
110 public class ClusterAdminRpcServiceTest {
111 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
112 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
113 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
114 private final List<MemberNode> memberNodes = new ArrayList<>();
117 public void setUp() {
118 InMemoryJournal.clear();
119 InMemorySnapshotStore.clear();
123 public void tearDown() {
124 for (MemberNode m : Lists.reverse(memberNodes)) {
131 public void testBackupDatastore() throws Exception {
132 MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
133 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
134 .testName("testBackupDatastore").build();
136 String fileName = "target/testBackupDatastore";
137 new File(fileName).delete();
139 ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
141 RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
142 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
143 verifySuccessfulRpcResult(rpcResult);
145 try (FileInputStream fis = new FileInputStream(fileName)) {
146 List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
147 assertEquals("DatastoreSnapshot size", 2, snapshots.size());
149 ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
150 snapshots.get(1).getType(), snapshots.get(1));
151 verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
152 map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
154 new File(fileName).delete();
157 // Test failure by killing a shard.
159 node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
160 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
162 ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
163 node.kit().watch(carsShardActor);
164 carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165 node.kit().expectTerminated(carsShardActor);
167 rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
168 .get(5, TimeUnit.SECONDS);
169 assertFalse("isSuccessful", rpcResult.isSuccessful());
170 assertEquals("getErrors", 1, rpcResult.getErrors().size());
173 private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
174 String... expShardNames) {
175 assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
176 Set<String> shardNames = new HashSet<>();
177 for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
178 shardNames.add(s.getName());
181 assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
185 public void testAddRemovePrefixShardReplica() throws Exception {
186 String name = "testAddPrefixShardReplica";
187 String moduleShardsConfig = "module-shards-default.conf";
189 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
190 .moduleShardsConfig(moduleShardsConfig).build();
191 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
192 .moduleShardsConfig(moduleShardsConfig).build();
193 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
194 .moduleShardsConfig(moduleShardsConfig).build();
196 member1.waitForMembersUp("member-2", "member-3");
197 replicaNode2.kit().waitForMembersUp("member-1", "member-3");
198 replicaNode3.kit().waitForMembersUp("member-1", "member-2");
200 final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
202 shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
203 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
204 "prefix", Collections.singleton(MEMBER_1))),
205 ActorRef.noSender());
207 member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
208 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
210 final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
211 final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
212 Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
214 addPrefixShardReplica(replicaNode2, identifier, serializer,
215 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
217 addPrefixShardReplica(replicaNode3, identifier, serializer,
218 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
220 verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
221 "member-2", "member-3");
223 removePrefixShardReplica(member1, identifier, "member-3", serializer,
224 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
226 verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
227 verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
230 removePrefixShardReplica(member1, identifier, "member-2", serializer,
231 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
233 verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
237 public void testModuleShardLeaderMovement() throws Exception {
238 String name = "testModuleShardLeaderMovement";
239 String moduleShardsConfig = "module-shards-member1.conf";
241 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
242 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
243 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
244 .moduleShardsConfig(moduleShardsConfig).build();
245 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
246 .moduleShardsConfig(moduleShardsConfig).build();
248 member1.waitForMembersUp("member-2", "member-3");
250 doAddShardReplica(replicaNode2, "cars", "member-1");
251 doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
253 verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
255 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
257 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
259 doMakeShardLeaderLocal(member1, "cars", "member-1");
260 replicaNode2.kit().waitUntilLeader(replicaNode2.configDataStore().getActorContext(), "cars");
261 replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
263 doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
264 member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "cars");
265 replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
267 doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
271 public void testAddShardReplica() throws Exception {
272 String name = "testAddShardReplica";
273 String moduleShardsConfig = "module-shards-cars-member-1.conf";
274 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
275 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
277 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
278 .moduleShardsConfig(moduleShardsConfig).build();
280 leaderNode1.waitForMembersUp("member-2");
282 doAddShardReplica(newReplicaNode2, "cars", "member-1");
284 MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
285 .moduleShardsConfig(moduleShardsConfig).build();
287 leaderNode1.waitForMembersUp("member-3");
288 newReplicaNode2.waitForMembersUp("member-3");
290 doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
292 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
293 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
295 // Write data to member-2's config datastore and read/verify via member-3
296 final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
297 newReplicaNode3.configDataStore());
299 // Write data to member-3's oper datastore and read/verify via member-2
300 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
302 // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
303 // 2 ServerConfigurationPayload entries and the transaction payload entry.
305 RaftStateVerifier verifier = raftState -> {
306 assertEquals("Commit index", 2, raftState.getCommitIndex());
307 assertEquals("Last applied index", 2, raftState.getLastApplied());
310 verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
311 verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
313 verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
314 verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
316 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
317 verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
319 // Restart member-3 and verify the cars config shard is re-instated.
321 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
322 newReplicaNode3.cleanup();
324 newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
325 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
327 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
328 readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
332 public void testAddShardReplicaFailures() throws Exception {
333 String name = "testAddShardReplicaFailures";
334 MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
335 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
337 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
338 memberNode.operDataStore(), null);
340 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
341 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
342 verifyFailedRpcResult(rpcResult);
344 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
345 .build()).get(10, TimeUnit.SECONDS);
346 verifyFailedRpcResult(rpcResult);
348 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
349 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
350 verifyFailedRpcResult(rpcResult);
353 private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
354 AbstractDataStore readFromStore) throws Exception {
355 DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
356 NormalizedNode<?, ?> carsNode = CarsModel.create();
357 writeTx.write(CarsModel.BASE_PATH, carsNode);
359 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
360 Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
361 assertEquals("canCommit", TRUE, canCommit);
362 cohort.preCommit().get(5, TimeUnit.SECONDS);
363 cohort.commit().get(5, TimeUnit.SECONDS);
365 readCarsNodeAndVerify(readFromStore, carsNode);
369 private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
370 NormalizedNode<?, ?> expCarsNode) throws Exception {
371 Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
372 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
373 assertTrue("isPresent", optional.isPresent());
374 assertEquals("Data node", expCarsNode, optional.get());
377 private void addPrefixShardReplica(final MemberNode memberNode,
378 final InstanceIdentifier<?> identifier,
379 final BindingNormalizedNodeSerializer serializer,
380 final String shardName,
381 final String... peerMemberNames) throws Exception {
383 final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
384 .setShardPrefix(identifier)
385 .setDataStoreType(DataStoreType.Config).build();
387 final ClusterAdminRpcService service =
388 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
390 final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
391 verifySuccessfulRpcResult(rpcResult);
393 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
394 Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
395 assertTrue("Replica shard not present", optional.isPresent());
398 private void removePrefixShardReplica(final MemberNode memberNode,
399 final InstanceIdentifier<?> identifier,
400 final String removeFromMember,
401 final BindingNormalizedNodeSerializer serializer,
402 final String shardName,
403 final String... peerMemberNames) throws Exception {
404 final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
405 .setDataStoreType(DataStoreType.Config)
406 .setShardPrefix(identifier)
407 .setMemberName(removeFromMember).build();
409 final ClusterAdminRpcService service =
410 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
412 final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
413 verifySuccessfulRpcResult(rpcResult);
415 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
418 private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
420 memberNode.waitForMembersUp(peerMemberNames);
422 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
423 memberNode.operDataStore(), null);
425 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
426 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
427 verifySuccessfulRpcResult(rpcResult);
429 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
431 Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
432 assertFalse("Oper shard present", optional.isPresent());
434 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
435 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
436 verifySuccessfulRpcResult(rpcResult);
438 verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
441 private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader)
443 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
444 memberNode.operDataStore(), null);
446 final RpcResult<Void> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
447 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
448 .get(10, TimeUnit.SECONDS);
450 verifySuccessfulRpcResult(rpcResult);
452 verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
453 containsString(newLeader)));
457 private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
458 if (!rpcResult.isSuccessful()) {
459 if (rpcResult.getErrors().size() > 0) {
460 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
461 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
464 fail("Rpc failed with no error");
467 return rpcResult.getResult();
470 private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
471 assertFalse("RpcResult", rpcResult.isSuccessful());
472 assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
473 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
474 assertNotNull("RpcResult error message null", error.getMessage());
478 public void testRemoveShardReplica() throws Exception {
479 String name = "testRemoveShardReplica";
480 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
481 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
482 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
483 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
486 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
487 .moduleShardsConfig(moduleShardsConfig).build();
489 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
490 .moduleShardsConfig(moduleShardsConfig).build();
492 leaderNode1.configDataStore().waitTillReady();
493 replicaNode3.configDataStore().waitTillReady();
494 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
495 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
496 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
498 // Invoke RPC service on member-3 to remove it's local shard
500 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
501 replicaNode3.operDataStore(), null);
503 RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
504 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
505 .get(10, TimeUnit.SECONDS);
506 verifySuccessfulRpcResult(rpcResult);
508 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
509 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
510 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
512 // Restart member-2 and verify member-3 isn't present.
514 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
515 replicaNode2.cleanup();
517 MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
518 .moduleShardsConfig(moduleShardsConfig).build();
520 newPeplicaNode2.configDataStore().waitTillReady();
521 verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
523 // Invoke RPC service on member-1 to remove member-2
525 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
526 leaderNode1.operDataStore(), null);
528 rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
529 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
530 verifySuccessfulRpcResult(rpcResult);
532 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
533 verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
537 public void testRemoveShardLeaderReplica() throws Exception {
538 String name = "testRemoveShardLeaderReplica";
539 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
540 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
541 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
542 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
545 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
546 .moduleShardsConfig(moduleShardsConfig).build();
548 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
549 .moduleShardsConfig(moduleShardsConfig).build();
551 leaderNode1.configDataStore().waitTillReady();
552 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
553 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
554 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
556 replicaNode2.waitForMembersUp("member-1", "member-3");
557 replicaNode3.waitForMembersUp("member-1", "member-2");
559 // Invoke RPC service on leader member-1 to remove it's local shard
561 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
562 leaderNode1.operDataStore(), null);
564 RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
565 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
566 .get(10, TimeUnit.SECONDS);
567 verifySuccessfulRpcResult(rpcResult);
569 verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
570 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
571 containsString("member-3"))));
573 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
574 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
575 verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
579 public void testAddReplicasForAllShards() throws Exception {
580 String name = "testAddReplicasForAllShards";
581 String moduleShardsConfig = "module-shards-member1.conf";
582 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
583 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
585 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
586 "pets", null, Arrays.asList(MEMBER_1));
587 leaderNode1.configDataStore().getActorContext().getShardManager().tell(
588 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
589 leaderNode1.kit().expectMsgClass(Success.class);
590 leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
592 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
593 .moduleShardsConfig(moduleShardsConfig).build();
595 leaderNode1.waitForMembersUp("member-2");
596 newReplicaNode2.waitForMembersUp("member-1");
598 newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
599 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
600 newReplicaNode2.kit().expectMsgClass(Success.class);
602 newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
603 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
604 "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
605 newReplicaNode2.kit().getRef());
606 newReplicaNode2.kit().expectMsgClass(Success.class);
608 ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
609 newReplicaNode2.operDataStore(), null);
611 RpcResult<AddReplicasForAllShardsOutput> rpcResult =
612 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
613 AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
614 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
615 successShardResult("people", DataStoreType.Config),
616 successShardResult("pets", DataStoreType.Config),
617 successShardResult("cars", DataStoreType.Operational),
618 successShardResult("people", DataStoreType.Operational),
619 failedShardResult("no-leader", DataStoreType.Operational));
621 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
622 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
623 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
624 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
625 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
629 public void testRemoveAllShardReplicas() throws Exception {
630 String name = "testRemoveAllShardReplicas";
631 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
632 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
633 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
634 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
637 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
638 .moduleShardsConfig(moduleShardsConfig).build();
640 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
641 .moduleShardsConfig(moduleShardsConfig).build();
643 leaderNode1.configDataStore().waitTillReady();
644 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
645 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
646 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
648 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
649 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
650 leaderNode1.configDataStore().getActorContext().getShardManager().tell(
651 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
652 leaderNode1.kit().expectMsgClass(Success.class);
654 replicaNode2.configDataStore().getActorContext().getShardManager().tell(
655 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
656 replicaNode2.kit().expectMsgClass(Success.class);
658 replicaNode3.configDataStore().getActorContext().getShardManager().tell(
659 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
660 replicaNode3.kit().expectMsgClass(Success.class);
662 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
663 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
664 verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
666 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
667 replicaNode3.operDataStore(), null);
669 RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
670 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
671 RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
672 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
673 successShardResult("people", DataStoreType.Config),
674 successShardResult("pets", DataStoreType.Config),
675 successShardResult("cars", DataStoreType.Operational),
676 successShardResult("people", DataStoreType.Operational));
678 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
679 verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
680 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
681 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
682 verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
683 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
684 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
685 verifyNoShardPresent(replicaNode3.configDataStore(), "people");
686 verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
690 public void testChangeMemberVotingStatesForShard() throws Exception {
691 String name = "testChangeMemberVotingStatusForShard";
692 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
693 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
694 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
695 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
698 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
699 .moduleShardsConfig(moduleShardsConfig).build();
701 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
702 .moduleShardsConfig(moduleShardsConfig).build();
704 leaderNode1.configDataStore().waitTillReady();
705 replicaNode3.configDataStore().waitTillReady();
706 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
707 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
708 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
710 // Invoke RPC service on member-3 to change voting status
712 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
713 replicaNode3.operDataStore(), null);
715 RpcResult<Void> rpcResult = service3
716 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
717 .setShardName("cars").setDataStoreType(DataStoreType.Config)
718 .setMemberVotingState(ImmutableList.of(
719 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
720 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
722 .get(10, TimeUnit.SECONDS);
723 verifySuccessfulRpcResult(rpcResult);
725 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
726 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
727 verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
728 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
729 verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
730 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
734 public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
735 String name = "testChangeMemberVotingStatesForSingleNodeShard";
736 String moduleShardsConfig = "module-shards-member1.conf";
737 MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
738 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
739 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
742 leaderNode.configDataStore().waitTillReady();
744 // Invoke RPC service on member-3 to change voting status
746 ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
747 leaderNode.operDataStore(), null);
749 RpcResult<Void> rpcResult = service
750 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
751 .setShardName("cars").setDataStoreType(DataStoreType.Config)
752 .setMemberVotingState(ImmutableList
753 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
755 .get(10, TimeUnit.SECONDS);
756 verifyFailedRpcResult(rpcResult);
758 verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
762 public void testChangeMemberVotingStatesForAllShards() throws Exception {
763 String name = "testChangeMemberVotingStatesForAllShards";
764 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
765 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
766 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
767 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
770 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
771 .moduleShardsConfig(moduleShardsConfig).build();
773 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
774 .moduleShardsConfig(moduleShardsConfig).build();
776 leaderNode1.configDataStore().waitTillReady();
777 leaderNode1.operDataStore().waitTillReady();
778 replicaNode3.configDataStore().waitTillReady();
779 replicaNode3.operDataStore().waitTillReady();
780 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
781 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
782 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
784 // Invoke RPC service on member-3 to change voting status
786 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
787 replicaNode3.operDataStore(), null);
789 RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
790 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
791 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
792 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
793 .get(10, TimeUnit.SECONDS);
794 ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
795 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
796 successShardResult("people", DataStoreType.Config),
797 successShardResult("cars", DataStoreType.Operational),
798 successShardResult("people", DataStoreType.Operational));
800 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
801 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
802 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
803 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
804 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
808 public void testFlipMemberVotingStates() throws Exception {
809 String name = "testFlipMemberVotingStates";
811 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
812 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
813 new ServerInfo("member-3", false)));
815 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
816 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
817 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
819 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
820 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
821 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
822 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
825 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
826 .moduleShardsConfig(moduleShardsConfig).build();
828 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
829 .moduleShardsConfig(moduleShardsConfig).build();
831 leaderNode1.configDataStore().waitTillReady();
832 leaderNode1.operDataStore().waitTillReady();
833 replicaNode3.configDataStore().waitTillReady();
834 replicaNode3.operDataStore().waitTillReady();
835 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
836 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
838 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
839 replicaNode3.operDataStore(), null);
841 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
842 .get(10, TimeUnit.SECONDS);
843 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
844 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
845 successShardResult("people", DataStoreType.Config),
846 successShardResult("cars", DataStoreType.Operational),
847 successShardResult("people", DataStoreType.Operational));
849 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
850 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
851 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
852 new String[]{"cars", "people"},
853 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
854 new SimpleEntry<>("member-3", TRUE));
856 // Leadership should have transferred to member 3 since it is the only remaining voting member.
857 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
858 assertNotNull("Expected non-null leader Id", raftState.getLeader());
859 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
860 raftState.getLeader().contains("member-3"));
863 verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
864 assertNotNull("Expected non-null leader Id", raftState.getLeader());
865 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
866 raftState.getLeader().contains("member-3"));
869 // Flip the voting states back to the original states.
871 rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
872 result = verifySuccessfulRpcResult(rpcResult);
873 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
874 successShardResult("people", DataStoreType.Config),
875 successShardResult("cars", DataStoreType.Operational),
876 successShardResult("people", DataStoreType.Operational));
878 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
879 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
880 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
881 new String[]{"cars", "people"},
882 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
883 new SimpleEntry<>("member-3", FALSE));
885 // Leadership should have transferred to member 1 or 2.
886 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
887 assertNotNull("Expected non-null leader Id", raftState.getLeader());
888 assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
889 raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
894 public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
895 String name = "testFlipMemberVotingStatesWithNoInitialLeader";
897 // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
898 // non-voting and simulated as down by not starting them up.
899 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
900 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
901 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
902 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
904 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
905 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
906 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
908 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
909 final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
910 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
911 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
914 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
915 .moduleShardsConfig(moduleShardsConfig).build();
917 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
918 .moduleShardsConfig(moduleShardsConfig).build();
920 // Initially there won't be a leader b/c all the up nodes are non-voting.
922 replicaNode1.waitForMembersUp("member-2", "member-3");
924 verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
925 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
926 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
927 new SimpleEntry<>("member-6", TRUE));
929 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
930 assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
932 ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
933 replicaNode1.operDataStore(), null);
935 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
936 .get(10, TimeUnit.SECONDS);
937 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
938 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
939 successShardResult("people", DataStoreType.Config),
940 successShardResult("cars", DataStoreType.Operational),
941 successShardResult("people", DataStoreType.Operational));
943 verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
944 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
945 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
946 new String[]{"cars", "people"},
947 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
948 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
949 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
951 // Since member 1 was changed to voting and there was no leader, it should've started and election
953 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
954 assertNotNull("Expected non-null leader Id", raftState.getLeader());
955 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
956 raftState.getLeader().contains("member-1"));
959 verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
960 assertNotNull("Expected non-null leader Id", raftState.getLeader());
961 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
962 raftState.getLeader().contains("member-1"));
967 public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
968 String name = "testFlipMemberVotingStatesWithVotingMembersDown";
970 // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
971 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
972 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
973 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
974 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
976 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
977 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
978 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
980 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
981 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
982 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
983 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
986 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
987 .moduleShardsConfig(moduleShardsConfig).build();
989 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
990 .moduleShardsConfig(moduleShardsConfig).build();
992 leaderNode1.configDataStore().waitTillReady();
993 leaderNode1.operDataStore().waitTillReady();
994 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
995 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
996 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
997 new SimpleEntry<>("member-6", FALSE));
999 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1000 leaderNode1.operDataStore(), null);
1002 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1003 .get(10, TimeUnit.SECONDS);
1004 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1005 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1006 successShardResult("people", DataStoreType.Config),
1007 successShardResult("cars", DataStoreType.Operational),
1008 successShardResult("people", DataStoreType.Operational));
1010 // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1011 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1012 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1013 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1014 new String[]{"cars", "people"},
1015 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1016 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1017 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1019 // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1020 // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1021 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1022 assertNotNull("Expected non-null leader Id", raftState.getLeader());
1023 assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1027 private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
1028 String member, String datastoreTypeSuffix, String... shards) {
1029 String[] datastoreTypes = {"config_", "oper_"};
1030 for (String type : datastoreTypes) {
1031 for (String shard : shards) {
1032 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1033 for (ServerInfo info : serverConfig.getServerConfig()) {
1034 newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1035 type + datastoreTypeSuffix).toString(), info.isVoting()));
1038 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1039 type + datastoreTypeSuffix).toString();
1040 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1041 InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
1042 new ServerConfigurationPayload(newServerInfo)));
1048 private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
1049 SimpleEntry<String, Boolean>... expStates) throws Exception {
1050 for (AbstractDataStore datastore: datastores) {
1051 for (String shard: shards) {
1052 verifyVotingStates(datastore, shard, expStates);
1058 private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1059 SimpleEntry<String, Boolean>... expStates) throws Exception {
1060 String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1061 Map<String, Boolean> expStateMap = new HashMap<>();
1062 for (Entry<String, Boolean> e: expStates) {
1063 expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1064 datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1067 verifyRaftState(datastore, shardName, raftState -> {
1068 String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1069 datastore.getActorContext().getDataStoreName()).toString();
1070 assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1071 for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1072 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1077 private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1078 Map<String, ShardResult> expResultsMap = new HashMap<>();
1079 for (ShardResult r: expShardResults) {
1080 expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1083 for (ShardResult result: shardResults) {
1084 ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1085 assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1086 result.getDataStoreType()), exp);
1087 assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1088 if (exp.isSucceeded()) {
1089 assertNull("Expected null error message", result.getErrorMessage());
1091 assertNotNull("Expected error message", result.getErrorMessage());
1095 if (!expResultsMap.isEmpty()) {
1096 fail("Missing shard results for " + expResultsMap.keySet());
1100 private static ShardResult successShardResult(String shardName, DataStoreType type) {
1101 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1104 private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1105 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();