2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.admin;
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
36 import java.io.FileInputStream;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
46 import java.util.Map.Entry;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
77 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
78 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
100 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
101 import org.opendaylight.yangtools.yang.common.RpcError;
102 import org.opendaylight.yangtools.yang.common.RpcResult;
103 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 * Unit tests for ClusterAdminRpcService.
108 * @author Thomas Pantelis
110 public class ClusterAdminRpcServiceTest {
111 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
112 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
113 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
114 private final List<MemberNode> memberNodes = new ArrayList<>();
117 public void setUp() {
118 InMemoryJournal.clear();
119 InMemorySnapshotStore.clear();
123 public void tearDown() {
124 for (MemberNode m : Lists.reverse(memberNodes)) {
131 public void testBackupDatastore() throws Exception {
132 MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
133 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
134 .testName("testBackupDatastore").build();
136 String fileName = "target/testBackupDatastore";
137 new File(fileName).delete();
139 ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
141 RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
142 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
143 verifySuccessfulRpcResult(rpcResult);
145 try (FileInputStream fis = new FileInputStream(fileName)) {
146 List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
147 assertEquals("DatastoreSnapshot size", 2, snapshots.size());
149 ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
150 snapshots.get(1).getType(), snapshots.get(1));
151 verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
152 map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
154 new File(fileName).delete();
157 // Test failure by killing a shard.
159 node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
160 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
162 ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
163 node.kit().watch(carsShardActor);
164 carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165 node.kit().expectTerminated(carsShardActor);
167 rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
168 .get(5, TimeUnit.SECONDS);
169 assertFalse("isSuccessful", rpcResult.isSuccessful());
170 assertEquals("getErrors", 1, rpcResult.getErrors().size());
173 private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
174 String... expShardNames) {
175 assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
176 Set<String> shardNames = new HashSet<>();
177 for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
178 shardNames.add(s.getName());
181 assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
185 public void testAddRemovePrefixShardReplica() throws Exception {
186 String name = "testAddPrefixShardReplica";
187 String moduleShardsConfig = "module-shards-default.conf";
189 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
190 .moduleShardsConfig(moduleShardsConfig).build();
191 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
192 .moduleShardsConfig(moduleShardsConfig).build();
193 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
194 .moduleShardsConfig(moduleShardsConfig).build();
196 member1.waitForMembersUp("member-2", "member-3");
197 replicaNode2.kit().waitForMembersUp("member-1", "member-3");
198 replicaNode3.kit().waitForMembersUp("member-1", "member-2");
200 final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
202 shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
203 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
204 "prefix", Collections.singleton(MEMBER_1))),
205 ActorRef.noSender());
207 member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
208 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
210 final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
211 final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
212 Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
214 addPrefixShardReplica(replicaNode2, identifier, serializer,
215 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
217 addPrefixShardReplica(replicaNode3, identifier, serializer,
218 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
220 verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
221 "member-2", "member-3");
223 removePrefixShardReplica(member1, identifier, "member-3", serializer,
224 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
226 verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
227 verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
230 removePrefixShardReplica(member1, identifier, "member-2", serializer,
231 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
233 verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
237 public void testModuleShardLeaderMovement() throws Exception {
238 String name = "testModuleShardLeaderMovement";
239 String moduleShardsConfig = "module-shards-member1.conf";
241 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
242 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
243 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
244 .moduleShardsConfig(moduleShardsConfig).build();
245 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
246 .moduleShardsConfig(moduleShardsConfig).build();
248 member1.waitForMembersUp("member-2", "member-3");
249 replicaNode2.waitForMembersUp("member-1");
250 replicaNode3.waitForMembersUp("member-1", "member-2");
252 doAddShardReplica(replicaNode2, "cars", "member-1");
253 doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
255 verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
257 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
259 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
261 doMakeShardLeaderLocal(member1, "cars", "member-1");
262 verifyRaftState(replicaNode2.configDataStore(), "cars",
263 raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
264 verifyRaftState(replicaNode3.configDataStore(), "cars",
265 raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
267 doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
268 verifyRaftState(member1.configDataStore(), "cars",
269 raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
270 verifyRaftState(replicaNode3.configDataStore(), "cars",
271 raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
273 replicaNode2.waitForMembersUp("member-3");
274 doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
278 public void testAddShardReplica() throws Exception {
279 String name = "testAddShardReplica";
280 String moduleShardsConfig = "module-shards-cars-member-1.conf";
281 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
282 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
284 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
285 .moduleShardsConfig(moduleShardsConfig).build();
287 leaderNode1.waitForMembersUp("member-2");
289 doAddShardReplica(newReplicaNode2, "cars", "member-1");
291 MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
292 .moduleShardsConfig(moduleShardsConfig).build();
294 leaderNode1.waitForMembersUp("member-3");
295 newReplicaNode2.waitForMembersUp("member-3");
297 doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
299 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
300 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
302 // Write data to member-2's config datastore and read/verify via member-3
303 final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
304 newReplicaNode3.configDataStore());
306 // Write data to member-3's oper datastore and read/verify via member-2
307 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
309 // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
310 // 2 ServerConfigurationPayload entries and the transaction payload entry.
312 RaftStateVerifier verifier = raftState -> {
313 assertEquals("Commit index", 2, raftState.getCommitIndex());
314 assertEquals("Last applied index", 2, raftState.getLastApplied());
317 verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
318 verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
320 verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
321 verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
323 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
324 verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
326 // Restart member-3 and verify the cars config shard is re-instated.
328 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
329 newReplicaNode3.cleanup();
331 newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
332 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
334 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
335 readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
339 public void testAddShardReplicaFailures() throws Exception {
340 String name = "testAddShardReplicaFailures";
341 MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
342 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
344 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
345 memberNode.operDataStore(), null);
347 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
348 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
349 verifyFailedRpcResult(rpcResult);
351 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
352 .build()).get(10, TimeUnit.SECONDS);
353 verifyFailedRpcResult(rpcResult);
355 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
356 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
357 verifyFailedRpcResult(rpcResult);
360 private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
361 AbstractDataStore readFromStore) throws Exception {
362 DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
363 NormalizedNode<?, ?> carsNode = CarsModel.create();
364 writeTx.write(CarsModel.BASE_PATH, carsNode);
366 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
367 Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
368 assertEquals("canCommit", TRUE, canCommit);
369 cohort.preCommit().get(5, TimeUnit.SECONDS);
370 cohort.commit().get(5, TimeUnit.SECONDS);
372 readCarsNodeAndVerify(readFromStore, carsNode);
376 private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
377 NormalizedNode<?, ?> expCarsNode) throws Exception {
378 Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
379 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
380 assertTrue("isPresent", optional.isPresent());
381 assertEquals("Data node", expCarsNode, optional.get());
384 private void addPrefixShardReplica(final MemberNode memberNode,
385 final InstanceIdentifier<?> identifier,
386 final BindingNormalizedNodeSerializer serializer,
387 final String shardName,
388 final String... peerMemberNames) throws Exception {
390 final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
391 .setShardPrefix(identifier)
392 .setDataStoreType(DataStoreType.Config).build();
394 final ClusterAdminRpcService service =
395 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
397 final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
398 verifySuccessfulRpcResult(rpcResult);
400 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
401 Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
402 assertTrue("Replica shard not present", optional.isPresent());
405 private void removePrefixShardReplica(final MemberNode memberNode,
406 final InstanceIdentifier<?> identifier,
407 final String removeFromMember,
408 final BindingNormalizedNodeSerializer serializer,
409 final String shardName,
410 final String... peerMemberNames) throws Exception {
411 final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
412 .setDataStoreType(DataStoreType.Config)
413 .setShardPrefix(identifier)
414 .setMemberName(removeFromMember).build();
416 final ClusterAdminRpcService service =
417 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
419 final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
420 verifySuccessfulRpcResult(rpcResult);
422 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
425 private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
427 memberNode.waitForMembersUp(peerMemberNames);
429 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
430 memberNode.operDataStore(), null);
432 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
433 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
434 verifySuccessfulRpcResult(rpcResult);
436 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
438 Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
439 assertFalse("Oper shard present", optional.isPresent());
441 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
442 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
443 verifySuccessfulRpcResult(rpcResult);
445 verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
448 private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader)
450 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
451 memberNode.operDataStore(), null);
453 final RpcResult<Void> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
454 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
455 .get(10, TimeUnit.SECONDS);
457 verifySuccessfulRpcResult(rpcResult);
459 verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
460 containsString(newLeader)));
464 private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
465 if (!rpcResult.isSuccessful()) {
466 if (rpcResult.getErrors().size() > 0) {
467 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
468 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
471 fail("Rpc failed with no error");
474 return rpcResult.getResult();
477 private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
478 assertFalse("RpcResult", rpcResult.isSuccessful());
479 assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
480 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
481 assertNotNull("RpcResult error message null", error.getMessage());
485 public void testRemoveShardReplica() throws Exception {
486 String name = "testRemoveShardReplica";
487 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
488 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
489 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
490 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
493 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
494 .moduleShardsConfig(moduleShardsConfig).build();
496 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
497 .moduleShardsConfig(moduleShardsConfig).build();
499 leaderNode1.configDataStore().waitTillReady();
500 replicaNode3.configDataStore().waitTillReady();
501 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
502 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
503 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
505 // Invoke RPC service on member-3 to remove it's local shard
507 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
508 replicaNode3.operDataStore(), null);
510 RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
511 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
512 .get(10, TimeUnit.SECONDS);
513 verifySuccessfulRpcResult(rpcResult);
515 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
516 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
517 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
519 // Restart member-2 and verify member-3 isn't present.
521 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
522 replicaNode2.cleanup();
524 MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
525 .moduleShardsConfig(moduleShardsConfig).build();
527 newPeplicaNode2.configDataStore().waitTillReady();
528 verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
530 // Invoke RPC service on member-1 to remove member-2
532 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
533 leaderNode1.operDataStore(), null);
535 rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
536 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
537 verifySuccessfulRpcResult(rpcResult);
539 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
540 verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
544 public void testRemoveShardLeaderReplica() throws Exception {
545 String name = "testRemoveShardLeaderReplica";
546 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
547 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
548 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
549 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
552 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
553 .moduleShardsConfig(moduleShardsConfig).build();
555 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
556 .moduleShardsConfig(moduleShardsConfig).build();
558 leaderNode1.configDataStore().waitTillReady();
559 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
560 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
561 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
563 replicaNode2.waitForMembersUp("member-1", "member-3");
564 replicaNode3.waitForMembersUp("member-1", "member-2");
566 // Invoke RPC service on leader member-1 to remove it's local shard
568 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
569 leaderNode1.operDataStore(), null);
571 RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
572 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
573 .get(10, TimeUnit.SECONDS);
574 verifySuccessfulRpcResult(rpcResult);
576 verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
577 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
578 containsString("member-3"))));
580 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
581 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
582 verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
586 public void testAddReplicasForAllShards() throws Exception {
587 String name = "testAddReplicasForAllShards";
588 String moduleShardsConfig = "module-shards-member1.conf";
589 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
590 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
592 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
593 "pets", null, Arrays.asList(MEMBER_1));
594 leaderNode1.configDataStore().getActorContext().getShardManager().tell(
595 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
596 leaderNode1.kit().expectMsgClass(Success.class);
597 leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
599 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
600 .moduleShardsConfig(moduleShardsConfig).build();
602 leaderNode1.waitForMembersUp("member-2");
603 newReplicaNode2.waitForMembersUp("member-1");
605 newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
606 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
607 newReplicaNode2.kit().expectMsgClass(Success.class);
609 newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
610 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
611 "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
612 newReplicaNode2.kit().getRef());
613 newReplicaNode2.kit().expectMsgClass(Success.class);
615 ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
616 newReplicaNode2.operDataStore(), null);
618 RpcResult<AddReplicasForAllShardsOutput> rpcResult =
619 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
620 AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
621 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
622 successShardResult("people", DataStoreType.Config),
623 successShardResult("pets", DataStoreType.Config),
624 successShardResult("cars", DataStoreType.Operational),
625 successShardResult("people", DataStoreType.Operational),
626 failedShardResult("no-leader", DataStoreType.Operational));
628 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
629 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
630 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
631 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
632 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
636 public void testRemoveAllShardReplicas() throws Exception {
637 String name = "testRemoveAllShardReplicas";
638 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
639 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
640 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
641 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
644 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
645 .moduleShardsConfig(moduleShardsConfig).build();
647 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
648 .moduleShardsConfig(moduleShardsConfig).build();
650 leaderNode1.configDataStore().waitTillReady();
651 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
652 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
653 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
655 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
656 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
657 leaderNode1.configDataStore().getActorContext().getShardManager().tell(
658 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
659 leaderNode1.kit().expectMsgClass(Success.class);
661 replicaNode2.configDataStore().getActorContext().getShardManager().tell(
662 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
663 replicaNode2.kit().expectMsgClass(Success.class);
665 replicaNode3.configDataStore().getActorContext().getShardManager().tell(
666 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
667 replicaNode3.kit().expectMsgClass(Success.class);
669 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
670 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
671 verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
673 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
674 replicaNode3.operDataStore(), null);
676 RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
677 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
678 RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
679 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
680 successShardResult("people", DataStoreType.Config),
681 successShardResult("pets", DataStoreType.Config),
682 successShardResult("cars", DataStoreType.Operational),
683 successShardResult("people", DataStoreType.Operational));
685 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
686 verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
687 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
688 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
689 verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
690 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
691 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
692 verifyNoShardPresent(replicaNode3.configDataStore(), "people");
693 verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
697 public void testChangeMemberVotingStatesForShard() throws Exception {
698 String name = "testChangeMemberVotingStatusForShard";
699 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
700 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
701 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
702 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
705 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
706 .moduleShardsConfig(moduleShardsConfig).build();
708 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
709 .moduleShardsConfig(moduleShardsConfig).build();
711 leaderNode1.configDataStore().waitTillReady();
712 replicaNode3.configDataStore().waitTillReady();
713 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
714 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
715 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
717 // Invoke RPC service on member-3 to change voting status
719 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
720 replicaNode3.operDataStore(), null);
722 RpcResult<Void> rpcResult = service3
723 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
724 .setShardName("cars").setDataStoreType(DataStoreType.Config)
725 .setMemberVotingState(ImmutableList.of(
726 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
727 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
729 .get(10, TimeUnit.SECONDS);
730 verifySuccessfulRpcResult(rpcResult);
732 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
733 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
734 verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
735 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
736 verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
737 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
741 public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
742 String name = "testChangeMemberVotingStatesForSingleNodeShard";
743 String moduleShardsConfig = "module-shards-member1.conf";
744 MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
745 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
746 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
749 leaderNode.configDataStore().waitTillReady();
751 // Invoke RPC service on member-3 to change voting status
753 ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
754 leaderNode.operDataStore(), null);
756 RpcResult<Void> rpcResult = service
757 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
758 .setShardName("cars").setDataStoreType(DataStoreType.Config)
759 .setMemberVotingState(ImmutableList
760 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
762 .get(10, TimeUnit.SECONDS);
763 verifyFailedRpcResult(rpcResult);
765 verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
769 public void testChangeMemberVotingStatesForAllShards() throws Exception {
770 String name = "testChangeMemberVotingStatesForAllShards";
771 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
772 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
773 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
774 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
777 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
778 .moduleShardsConfig(moduleShardsConfig).build();
780 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
781 .moduleShardsConfig(moduleShardsConfig).build();
783 leaderNode1.configDataStore().waitTillReady();
784 leaderNode1.operDataStore().waitTillReady();
785 replicaNode3.configDataStore().waitTillReady();
786 replicaNode3.operDataStore().waitTillReady();
787 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
788 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
789 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
791 // Invoke RPC service on member-3 to change voting status
793 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
794 replicaNode3.operDataStore(), null);
796 RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
797 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
798 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
799 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
800 .get(10, TimeUnit.SECONDS);
801 ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
802 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
803 successShardResult("people", DataStoreType.Config),
804 successShardResult("cars", DataStoreType.Operational),
805 successShardResult("people", DataStoreType.Operational));
807 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
808 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
809 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
810 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
811 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
815 public void testFlipMemberVotingStates() throws Exception {
816 String name = "testFlipMemberVotingStates";
818 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
819 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
820 new ServerInfo("member-3", false)));
822 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
823 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
824 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
826 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
827 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
828 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
829 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
832 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
833 .moduleShardsConfig(moduleShardsConfig).build();
835 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
836 .moduleShardsConfig(moduleShardsConfig).build();
838 leaderNode1.configDataStore().waitTillReady();
839 leaderNode1.operDataStore().waitTillReady();
840 replicaNode3.configDataStore().waitTillReady();
841 replicaNode3.operDataStore().waitTillReady();
842 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
843 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
845 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
846 replicaNode3.operDataStore(), null);
848 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
849 .get(10, TimeUnit.SECONDS);
850 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
851 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
852 successShardResult("people", DataStoreType.Config),
853 successShardResult("cars", DataStoreType.Operational),
854 successShardResult("people", DataStoreType.Operational));
856 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
857 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
858 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
859 new String[]{"cars", "people"},
860 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
861 new SimpleEntry<>("member-3", TRUE));
863 // Leadership should have transferred to member 3 since it is the only remaining voting member.
864 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
865 assertNotNull("Expected non-null leader Id", raftState.getLeader());
866 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
867 raftState.getLeader().contains("member-3"));
870 verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
871 assertNotNull("Expected non-null leader Id", raftState.getLeader());
872 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
873 raftState.getLeader().contains("member-3"));
876 // Flip the voting states back to the original states.
878 rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
879 result = verifySuccessfulRpcResult(rpcResult);
880 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
881 successShardResult("people", DataStoreType.Config),
882 successShardResult("cars", DataStoreType.Operational),
883 successShardResult("people", DataStoreType.Operational));
885 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
886 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
887 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
888 new String[]{"cars", "people"},
889 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
890 new SimpleEntry<>("member-3", FALSE));
892 // Leadership should have transferred to member 1 or 2.
893 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
894 assertNotNull("Expected non-null leader Id", raftState.getLeader());
895 assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
896 raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
901 public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
902 String name = "testFlipMemberVotingStatesWithNoInitialLeader";
904 // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
905 // non-voting and simulated as down by not starting them up.
906 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
907 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
908 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
909 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
911 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
912 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
913 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
915 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
916 final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
917 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
918 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
921 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
922 .moduleShardsConfig(moduleShardsConfig).build();
924 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
925 .moduleShardsConfig(moduleShardsConfig).build();
927 // Initially there won't be a leader b/c all the up nodes are non-voting.
929 replicaNode1.waitForMembersUp("member-2", "member-3");
931 verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
932 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
933 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
934 new SimpleEntry<>("member-6", TRUE));
936 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
937 assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
939 ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
940 replicaNode1.operDataStore(), null);
942 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
943 .get(10, TimeUnit.SECONDS);
944 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
945 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
946 successShardResult("people", DataStoreType.Config),
947 successShardResult("cars", DataStoreType.Operational),
948 successShardResult("people", DataStoreType.Operational));
950 verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
951 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
952 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
953 new String[]{"cars", "people"},
954 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
955 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
956 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
958 // Since member 1 was changed to voting and there was no leader, it should've started and election
960 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
961 assertNotNull("Expected non-null leader Id", raftState.getLeader());
962 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
963 raftState.getLeader().contains("member-1"));
966 verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
967 assertNotNull("Expected non-null leader Id", raftState.getLeader());
968 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
969 raftState.getLeader().contains("member-1"));
974 public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
975 String name = "testFlipMemberVotingStatesWithVotingMembersDown";
977 // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
978 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
979 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
980 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
981 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
983 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
984 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
985 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
987 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
988 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
989 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
990 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
993 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
994 .moduleShardsConfig(moduleShardsConfig).build();
996 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
997 .moduleShardsConfig(moduleShardsConfig).build();
999 leaderNode1.configDataStore().waitTillReady();
1000 leaderNode1.operDataStore().waitTillReady();
1001 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
1002 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
1003 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
1004 new SimpleEntry<>("member-6", FALSE));
1006 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1007 leaderNode1.operDataStore(), null);
1009 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1010 .get(10, TimeUnit.SECONDS);
1011 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1012 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1013 successShardResult("people", DataStoreType.Config),
1014 successShardResult("cars", DataStoreType.Operational),
1015 successShardResult("people", DataStoreType.Operational));
1017 // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1018 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1019 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1020 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1021 new String[]{"cars", "people"},
1022 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1023 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1024 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1026 // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1027 // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1028 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1029 assertNotNull("Expected non-null leader Id", raftState.getLeader());
1030 assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1034 private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
1035 String member, String datastoreTypeSuffix, String... shards) {
1036 String[] datastoreTypes = {"config_", "oper_"};
1037 for (String type : datastoreTypes) {
1038 for (String shard : shards) {
1039 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1040 for (ServerInfo info : serverConfig.getServerConfig()) {
1041 newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1042 type + datastoreTypeSuffix).toString(), info.isVoting()));
1045 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1046 type + datastoreTypeSuffix).toString();
1047 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1048 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
1049 new ServerConfigurationPayload(newServerInfo)));
1055 private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
1056 SimpleEntry<String, Boolean>... expStates) throws Exception {
1057 for (AbstractDataStore datastore: datastores) {
1058 for (String shard: shards) {
1059 verifyVotingStates(datastore, shard, expStates);
1065 private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1066 SimpleEntry<String, Boolean>... expStates) throws Exception {
1067 String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1068 Map<String, Boolean> expStateMap = new HashMap<>();
1069 for (Entry<String, Boolean> e: expStates) {
1070 expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1071 datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1074 verifyRaftState(datastore, shardName, raftState -> {
1075 String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1076 datastore.getActorContext().getDataStoreName()).toString();
1077 assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1078 for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1079 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1084 private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1085 Map<String, ShardResult> expResultsMap = new HashMap<>();
1086 for (ShardResult r: expShardResults) {
1087 expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1090 for (ShardResult result: shardResults) {
1091 ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1092 assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1093 result.getDataStoreType()), exp);
1094 assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1095 if (exp.isSucceeded()) {
1096 assertNull("Expected null error message", result.getErrorMessage());
1098 assertNotNull("Expected error message", result.getErrorMessage());
1102 if (!expResultsMap.isEmpty()) {
1103 fail("Missing shard results for " + expResultsMap.keySet());
1107 private static ShardResult successShardResult(String shardName, DataStoreType type) {
1108 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1111 private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1112 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();