2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.admin;
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
36 import java.io.FileInputStream;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
46 import java.util.Map.Entry;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
76 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
77 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
117 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
118 import org.opendaylight.yangtools.yang.common.RpcError;
119 import org.opendaylight.yangtools.yang.common.RpcResult;
120 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
123 * Unit tests for ClusterAdminRpcService.
125 * @author Thomas Pantelis
127 public class ClusterAdminRpcServiceTest {
128 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
129 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
130 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
131 private final List<MemberNode> memberNodes = new ArrayList<>();
134 public void setUp() {
135 InMemoryJournal.clear();
136 InMemorySnapshotStore.clear();
140 public void tearDown() {
141 for (MemberNode m : Lists.reverse(memberNodes)) {
148 public void testBackupDatastore() throws Exception {
149 MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
150 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
151 .testName("testBackupDatastore").build();
153 String fileName = "target/testBackupDatastore";
154 new File(fileName).delete();
156 ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
158 RpcResult<BackupDatastoreOutput> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
159 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
160 verifySuccessfulRpcResult(rpcResult);
162 try (FileInputStream fis = new FileInputStream(fileName)) {
163 List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
164 assertEquals("DatastoreSnapshot size", 2, snapshots.size());
166 ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
167 snapshots.get(1).getType(), snapshots.get(1));
168 verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(),
169 map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people");
171 new File(fileName).delete();
174 // Test failure by killing a shard.
176 node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder()
177 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
179 ActorRef carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").get();
180 node.kit().watch(carsShardActor);
181 carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
182 node.kit().expectTerminated(carsShardActor);
184 rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
185 .get(5, TimeUnit.SECONDS);
186 assertFalse("isSuccessful", rpcResult.isSuccessful());
187 assertEquals("getErrors", 1, rpcResult.getErrors().size());
190 private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot,
191 final String... expShardNames) {
192 assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
193 Set<String> shardNames = new HashSet<>();
194 for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
195 shardNames.add(s.getName());
198 assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
202 public void testAddRemovePrefixShardReplica() throws Exception {
203 String name = "testAddPrefixShardReplica";
204 String moduleShardsConfig = "module-shards-default.conf";
206 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
207 .moduleShardsConfig(moduleShardsConfig).build();
208 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
209 .moduleShardsConfig(moduleShardsConfig).build();
210 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
211 .moduleShardsConfig(moduleShardsConfig).build();
213 member1.waitForMembersUp("member-2", "member-3");
214 replicaNode2.kit().waitForMembersUp("member-1", "member-3");
215 replicaNode3.kit().waitForMembersUp("member-1", "member-2");
217 final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
219 shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
220 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
221 "prefix", Collections.singleton(MEMBER_1))),
222 ActorRef.noSender());
224 member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
225 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
227 final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
228 final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
229 Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
231 addPrefixShardReplica(replicaNode2, identifier, serializer,
232 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
234 addPrefixShardReplica(replicaNode3, identifier, serializer,
235 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
237 verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
238 "member-2", "member-3");
240 removePrefixShardReplica(member1, identifier, "member-3", serializer,
241 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
243 verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
244 verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
247 removePrefixShardReplica(member1, identifier, "member-2", serializer,
248 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
250 verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
254 public void testGetShardRole() throws Exception {
255 String name = "testGetShardRole";
256 String moduleShardsConfig = "module-shards-default-member-1.conf";
258 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
259 .moduleShardsConfig(moduleShardsConfig).build();
261 member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
263 final RpcResult<GetShardRoleOutput> successResult =
264 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default");
265 verifySuccessfulRpcResult(successResult);
266 assertEquals("Leader", successResult.getResult().getRole());
268 final RpcResult<GetShardRoleOutput> failedResult =
269 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars");
271 verifyFailedRpcResult(failedResult);
273 final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
275 shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
276 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
277 "prefix", Collections.singleton(MEMBER_1))),
278 ActorRef.noSender());
280 member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
281 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
283 final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
284 final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
285 Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
287 final RpcResult<GetPrefixShardRoleOutput> prefixSuccessResult =
288 getPrefixShardRole(member1, identifier, serializer);
290 verifySuccessfulRpcResult(prefixSuccessResult);
291 assertEquals("Leader", prefixSuccessResult.getResult().getRole());
293 final InstanceIdentifier<People> peopleId = InstanceIdentifier.create(People.class);
294 Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId);
296 final RpcResult<GetPrefixShardRoleOutput> prefixFail =
297 getPrefixShardRole(member1, peopleId, serializer);
299 verifyFailedRpcResult(prefixFail);
303 public void testGetPrefixShardRole() throws Exception {
304 String name = "testGetPrefixShardRole";
305 String moduleShardsConfig = "module-shards-default-member-1.conf";
307 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
308 .moduleShardsConfig(moduleShardsConfig).build();
310 member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
316 public void testModuleShardLeaderMovement() throws Exception {
317 String name = "testModuleShardLeaderMovement";
318 String moduleShardsConfig = "module-shards-member1.conf";
320 final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
321 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
322 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
323 .moduleShardsConfig(moduleShardsConfig).build();
324 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
325 .moduleShardsConfig(moduleShardsConfig).build();
327 member1.waitForMembersUp("member-2", "member-3");
328 replicaNode2.waitForMembersUp("member-1");
329 replicaNode3.waitForMembersUp("member-1", "member-2");
331 doAddShardReplica(replicaNode2, "cars", "member-1");
332 doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
334 verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
336 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
338 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
340 doMakeShardLeaderLocal(member1, "cars", "member-1");
341 verifyRaftState(replicaNode2.configDataStore(), "cars",
342 raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
343 verifyRaftState(replicaNode3.configDataStore(), "cars",
344 raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
346 doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
347 verifyRaftState(member1.configDataStore(), "cars",
348 raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
349 verifyRaftState(replicaNode3.configDataStore(), "cars",
350 raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
352 replicaNode2.waitForMembersUp("member-3");
353 doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
357 public void testAddShardReplica() throws Exception {
358 String name = "testAddShardReplica";
359 String moduleShardsConfig = "module-shards-cars-member-1.conf";
360 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
361 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
363 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
364 .moduleShardsConfig(moduleShardsConfig).build();
366 leaderNode1.waitForMembersUp("member-2");
368 doAddShardReplica(newReplicaNode2, "cars", "member-1");
370 MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
371 .moduleShardsConfig(moduleShardsConfig).build();
373 leaderNode1.waitForMembersUp("member-3");
374 newReplicaNode2.waitForMembersUp("member-3");
376 doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
378 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
379 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
381 // Write data to member-2's config datastore and read/verify via member-3
382 final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
383 newReplicaNode3.configDataStore());
385 // Write data to member-3's oper datastore and read/verify via member-2
386 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
388 // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
389 // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload.
391 RaftStateVerifier verifier = raftState -> {
392 assertEquals("Commit index", 5, raftState.getCommitIndex());
393 assertEquals("Last applied index", 5, raftState.getLastApplied());
396 verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
397 verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
399 verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
400 verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
402 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
403 verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
405 // Restart member-3 and verify the cars config shard is re-instated.
407 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
408 newReplicaNode3.cleanup();
410 newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
411 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
413 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
414 readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
418 public void testAddShardReplicaFailures() throws Exception {
419 String name = "testAddShardReplicaFailures";
420 MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
421 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
423 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
424 memberNode.operDataStore(), null);
426 RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
427 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
428 verifyFailedRpcResult(rpcResult);
430 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
431 .build()).get(10, TimeUnit.SECONDS);
432 verifyFailedRpcResult(rpcResult);
434 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
435 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
436 verifyFailedRpcResult(rpcResult);
439 private static NormalizedNode<?, ?> writeCarsNodeAndVerify(final AbstractDataStore writeToStore,
440 final AbstractDataStore readFromStore) throws Exception {
441 DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
442 NormalizedNode<?, ?> carsNode = CarsModel.create();
443 writeTx.write(CarsModel.BASE_PATH, carsNode);
445 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
446 Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
447 assertEquals("canCommit", TRUE, canCommit);
448 cohort.preCommit().get(5, TimeUnit.SECONDS);
449 cohort.commit().get(5, TimeUnit.SECONDS);
451 readCarsNodeAndVerify(readFromStore, carsNode);
455 private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore,
456 final NormalizedNode<?, ?> expCarsNode) throws Exception {
457 java.util.Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
458 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
459 assertTrue("isPresent", optional.isPresent());
460 assertEquals("Data node", expCarsNode, optional.get());
463 private static RpcResult<GetShardRoleOutput> getShardRole(final MemberNode memberNode,
464 final BindingNormalizedNodeSerializer serializer, final String shardName) throws Exception {
466 final GetShardRoleInput input = new GetShardRoleInputBuilder()
467 .setDataStoreType(DataStoreType.Config)
468 .setShardName(shardName)
471 final ClusterAdminRpcService service =
472 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
474 return service.getShardRole(input).get(10, TimeUnit.SECONDS);
477 private static RpcResult<GetPrefixShardRoleOutput> getPrefixShardRole(
478 final MemberNode memberNode,
479 final InstanceIdentifier<?> identifier,
480 final BindingNormalizedNodeSerializer serializer) throws Exception {
482 final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder()
483 .setDataStoreType(DataStoreType.Config)
484 .setShardPrefix(identifier)
487 final ClusterAdminRpcService service =
488 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
490 return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS);
493 private static void addPrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
494 final BindingNormalizedNodeSerializer serializer, final String shardName,
495 final String... peerMemberNames) throws Exception {
497 final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
498 .setShardPrefix(identifier)
499 .setDataStoreType(DataStoreType.Config).build();
501 final ClusterAdminRpcService service =
502 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
504 final RpcResult<AddPrefixShardReplicaOutput> rpcResult = service.addPrefixShardReplica(input)
505 .get(10, TimeUnit.SECONDS);
506 verifySuccessfulRpcResult(rpcResult);
508 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
509 Optional<ActorRef> optional = memberNode.configDataStore().getActorUtils().findLocalShard(shardName);
510 assertTrue("Replica shard not present", optional.isPresent());
513 private static void removePrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
514 final String removeFromMember, final BindingNormalizedNodeSerializer serializer, final String shardName,
515 final String... peerMemberNames) throws Exception {
516 final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
517 .setDataStoreType(DataStoreType.Config)
518 .setShardPrefix(identifier)
519 .setMemberName(removeFromMember).build();
521 final ClusterAdminRpcService service =
522 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
524 final RpcResult<RemovePrefixShardReplicaOutput> rpcResult = service.removePrefixShardReplica(input)
525 .get(10, TimeUnit.SECONDS);
526 verifySuccessfulRpcResult(rpcResult);
528 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
531 private static void doAddShardReplica(final MemberNode memberNode, final String shardName,
532 final String... peerMemberNames) throws Exception {
533 memberNode.waitForMembersUp(peerMemberNames);
535 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
536 memberNode.operDataStore(), null);
538 RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
539 .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
540 verifySuccessfulRpcResult(rpcResult);
542 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
544 Optional<ActorRef> optional = memberNode.operDataStore().getActorUtils().findLocalShard(shardName);
545 assertFalse("Oper shard present", optional.isPresent());
547 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
548 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
549 verifySuccessfulRpcResult(rpcResult);
551 verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
554 private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
555 final String newLeader) throws Exception {
556 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
557 memberNode.operDataStore(), null);
559 final RpcResult<MakeLeaderLocalOutput> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
560 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
561 .get(10, TimeUnit.SECONDS);
563 verifySuccessfulRpcResult(rpcResult);
565 verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
566 containsString(newLeader)));
569 private static <T> T verifySuccessfulRpcResult(final RpcResult<T> rpcResult) {
570 if (!rpcResult.isSuccessful()) {
571 if (rpcResult.getErrors().size() > 0) {
572 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
573 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
576 fail("Rpc failed with no error");
579 return rpcResult.getResult();
582 private static void verifyFailedRpcResult(final RpcResult<?> rpcResult) {
583 assertFalse("RpcResult", rpcResult.isSuccessful());
584 assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
585 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
586 assertNotNull("RpcResult error message null", error.getMessage());
590 public void testRemoveShardReplica() throws Exception {
591 String name = "testRemoveShardReplica";
592 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
593 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
594 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
595 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
598 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
599 .moduleShardsConfig(moduleShardsConfig).build();
601 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
602 .moduleShardsConfig(moduleShardsConfig).build();
604 leaderNode1.configDataStore().waitTillReady();
605 replicaNode3.configDataStore().waitTillReady();
606 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
607 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
608 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
610 // Invoke RPC service on member-3 to remove it's local shard
612 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
613 replicaNode3.operDataStore(), null);
615 RpcResult<RemoveShardReplicaOutput> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
616 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
617 .get(10, TimeUnit.SECONDS);
618 verifySuccessfulRpcResult(rpcResult);
620 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
621 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
622 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
624 // Restart member-2 and verify member-3 isn't present.
626 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
627 replicaNode2.cleanup();
629 MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
630 .moduleShardsConfig(moduleShardsConfig).build();
632 newPeplicaNode2.configDataStore().waitTillReady();
633 verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
635 // Invoke RPC service on member-1 to remove member-2
637 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
638 leaderNode1.operDataStore(), null);
640 rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
641 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
642 verifySuccessfulRpcResult(rpcResult);
644 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
645 verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
649 public void testRemoveShardLeaderReplica() throws Exception {
650 String name = "testRemoveShardLeaderReplica";
651 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
652 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
653 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
654 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
657 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
658 .moduleShardsConfig(moduleShardsConfig).build();
660 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
661 .moduleShardsConfig(moduleShardsConfig).build();
663 leaderNode1.configDataStore().waitTillReady();
664 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
665 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
666 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
668 replicaNode2.waitForMembersUp("member-1", "member-3");
669 replicaNode3.waitForMembersUp("member-1", "member-2");
671 // Invoke RPC service on leader member-1 to remove it's local shard
673 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
674 leaderNode1.operDataStore(), null);
676 RpcResult<RemoveShardReplicaOutput> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
677 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
678 .get(10, TimeUnit.SECONDS);
679 verifySuccessfulRpcResult(rpcResult);
681 verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
682 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
683 containsString("member-3"))));
685 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
686 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
687 verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
691 public void testAddReplicasForAllShards() throws Exception {
692 String name = "testAddReplicasForAllShards";
693 String moduleShardsConfig = "module-shards-member1.conf";
694 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
695 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
697 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
699 Collections.singletonList(MEMBER_1));
700 leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
701 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
702 leaderNode1.kit().expectMsgClass(Success.class);
703 leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets");
705 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
706 .moduleShardsConfig(moduleShardsConfig).build();
708 leaderNode1.waitForMembersUp("member-2");
709 newReplicaNode2.waitForMembersUp("member-1");
711 newReplicaNode2.configDataStore().getActorUtils().getShardManager().tell(
712 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
713 newReplicaNode2.kit().expectMsgClass(Success.class);
715 newReplicaNode2.operDataStore().getActorUtils().getShardManager().tell(
716 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
718 Collections.singletonList(MEMBER_1)),
719 Shard.builder(), null),
720 newReplicaNode2.kit().getRef());
721 newReplicaNode2.kit().expectMsgClass(Success.class);
723 ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
724 newReplicaNode2.operDataStore(), null);
726 RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards(
727 new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
728 AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
729 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
730 successShardResult("people", DataStoreType.Config),
731 successShardResult("pets", DataStoreType.Config),
732 successShardResult("cars", DataStoreType.Operational),
733 successShardResult("people", DataStoreType.Operational),
734 failedShardResult("no-leader", DataStoreType.Operational));
736 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
737 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
738 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
739 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
740 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
744 public void testRemoveAllShardReplicas() throws Exception {
745 String name = "testRemoveAllShardReplicas";
746 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
747 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
748 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
749 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
752 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
753 .moduleShardsConfig(moduleShardsConfig).build();
755 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
756 .moduleShardsConfig(moduleShardsConfig).build();
758 leaderNode1.configDataStore().waitTillReady();
759 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
760 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
761 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
763 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
764 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
765 leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
766 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
767 leaderNode1.kit().expectMsgClass(Success.class);
769 replicaNode2.configDataStore().getActorUtils().getShardManager().tell(
770 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
771 replicaNode2.kit().expectMsgClass(Success.class);
773 replicaNode3.configDataStore().getActorUtils().getShardManager().tell(
774 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
775 replicaNode3.kit().expectMsgClass(Success.class);
777 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
778 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
779 verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
781 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
782 replicaNode3.operDataStore(), null);
784 RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
785 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
786 RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
787 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
788 successShardResult("people", DataStoreType.Config),
789 successShardResult("pets", DataStoreType.Config),
790 successShardResult("cars", DataStoreType.Operational),
791 successShardResult("people", DataStoreType.Operational));
793 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
794 verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
795 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
796 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
797 verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
798 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
799 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
800 verifyNoShardPresent(replicaNode3.configDataStore(), "people");
801 verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
805 public void testChangeMemberVotingStatesForShard() throws Exception {
806 String name = "testChangeMemberVotingStatusForShard";
807 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
808 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
809 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
810 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
813 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
814 .moduleShardsConfig(moduleShardsConfig).build();
816 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
817 .moduleShardsConfig(moduleShardsConfig).build();
819 leaderNode1.configDataStore().waitTillReady();
820 replicaNode3.configDataStore().waitTillReady();
821 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
822 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
823 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
825 // Invoke RPC service on member-3 to change voting status
827 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
828 replicaNode3.operDataStore(), null);
830 RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service3
831 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
832 .setShardName("cars").setDataStoreType(DataStoreType.Config)
833 .setMemberVotingState(ImmutableList.of(
834 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
835 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
837 .get(10, TimeUnit.SECONDS);
838 verifySuccessfulRpcResult(rpcResult);
840 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
841 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
842 verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
843 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
844 verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
845 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
849 public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
850 String name = "testChangeMemberVotingStatesForSingleNodeShard";
851 String moduleShardsConfig = "module-shards-member1.conf";
852 MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
853 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
854 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
857 leaderNode.configDataStore().waitTillReady();
859 // Invoke RPC service on member-3 to change voting status
861 ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
862 leaderNode.operDataStore(), null);
864 RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service
865 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
866 .setShardName("cars").setDataStoreType(DataStoreType.Config)
867 .setMemberVotingState(ImmutableList
868 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
870 .get(10, TimeUnit.SECONDS);
871 verifyFailedRpcResult(rpcResult);
873 verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
877 public void testChangeMemberVotingStatesForAllShards() throws Exception {
878 String name = "testChangeMemberVotingStatesForAllShards";
879 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
880 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
881 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
882 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
885 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
886 .moduleShardsConfig(moduleShardsConfig).build();
888 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
889 .moduleShardsConfig(moduleShardsConfig).build();
891 leaderNode1.configDataStore().waitTillReady();
892 leaderNode1.operDataStore().waitTillReady();
893 replicaNode3.configDataStore().waitTillReady();
894 replicaNode3.operDataStore().waitTillReady();
895 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
896 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
897 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
899 // Invoke RPC service on member-3 to change voting status
901 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
902 replicaNode3.operDataStore(), null);
904 RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
905 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
906 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
907 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
908 .get(10, TimeUnit.SECONDS);
909 ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
910 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
911 successShardResult("people", DataStoreType.Config),
912 successShardResult("cars", DataStoreType.Operational),
913 successShardResult("people", DataStoreType.Operational));
915 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
916 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
917 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
918 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
919 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
923 public void testFlipMemberVotingStates() throws Exception {
924 String name = "testFlipMemberVotingStates";
926 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
927 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
928 new ServerInfo("member-3", false)));
930 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
931 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
932 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
934 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
935 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
936 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder()
937 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10))
940 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
941 .moduleShardsConfig(moduleShardsConfig).build();
943 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
944 .moduleShardsConfig(moduleShardsConfig).build();
946 leaderNode1.configDataStore().waitTillReady();
947 leaderNode1.operDataStore().waitTillReady();
948 replicaNode3.configDataStore().waitTillReady();
949 replicaNode3.operDataStore().waitTillReady();
950 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
951 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
953 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
954 replicaNode3.operDataStore(), null);
956 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards(
957 new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
958 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
959 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
960 successShardResult("people", DataStoreType.Config),
961 successShardResult("cars", DataStoreType.Operational),
962 successShardResult("people", DataStoreType.Operational));
964 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
965 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
966 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
967 new String[]{"cars", "people"},
968 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
969 new SimpleEntry<>("member-3", TRUE));
971 // Leadership should have transferred to member 3 since it is the only remaining voting member.
972 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
973 assertNotNull("Expected non-null leader Id", raftState.getLeader());
974 assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
975 raftState.getLeader().contains("member-3"));
978 verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
979 assertNotNull("Expected non-null leader Id", raftState.getLeader());
980 assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
981 raftState.getLeader().contains("member-3"));
984 // Flip the voting states back to the original states.
986 rpcResult = service3.flipMemberVotingStatesForAllShards(
987 new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
988 result = verifySuccessfulRpcResult(rpcResult);
989 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
990 successShardResult("people", DataStoreType.Config),
991 successShardResult("cars", DataStoreType.Operational),
992 successShardResult("people", DataStoreType.Operational));
994 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
995 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
996 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
997 new String[]{"cars", "people"},
998 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
999 new SimpleEntry<>("member-3", FALSE));
1001 // Leadership should have transferred to member 1 or 2.
1002 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1003 assertNotNull("Expected non-null leader Id", raftState.getLeader());
1004 assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
1005 raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
1010 public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
1011 String name = "testFlipMemberVotingStatesWithNoInitialLeader";
1013 // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
1014 // non-voting and simulated as down by not starting them up.
1015 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1016 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
1017 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
1018 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
1020 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1021 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1022 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1024 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1025 final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1026 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1027 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1030 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1031 .moduleShardsConfig(moduleShardsConfig).build();
1033 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1034 .moduleShardsConfig(moduleShardsConfig).build();
1036 // Initially there won't be a leader b/c all the up nodes are non-voting.
1038 replicaNode1.waitForMembersUp("member-2", "member-3");
1040 verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
1041 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
1042 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
1043 new SimpleEntry<>("member-6", TRUE));
1045 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
1046 assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
1048 ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
1049 replicaNode1.operDataStore(), null);
1051 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
1052 new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
1053 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1054 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1055 successShardResult("people", DataStoreType.Config),
1056 successShardResult("cars", DataStoreType.Operational),
1057 successShardResult("people", DataStoreType.Operational));
1059 verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
1060 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1061 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1062 new String[]{"cars", "people"},
1063 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
1064 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
1065 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
1067 // Since member 1 was changed to voting and there was no leader, it should've started and election
1068 // and become leader
1069 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
1070 assertNotNull("Expected non-null leader Id", raftState.getLeader());
1071 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1072 raftState.getLeader().contains("member-1"));
1075 verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
1076 assertNotNull("Expected non-null leader Id", raftState.getLeader());
1077 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1078 raftState.getLeader().contains("member-1"));
1083 public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
1084 String name = "testFlipMemberVotingStatesWithVotingMembersDown";
1086 // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
1087 ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1088 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
1089 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
1090 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
1092 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1093 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1094 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1096 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1097 final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1098 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1099 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1102 final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1103 .moduleShardsConfig(moduleShardsConfig).build();
1105 final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1106 .moduleShardsConfig(moduleShardsConfig).build();
1108 leaderNode1.configDataStore().waitTillReady();
1109 leaderNode1.operDataStore().waitTillReady();
1110 verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
1111 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
1112 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
1113 new SimpleEntry<>("member-6", FALSE));
1115 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1116 leaderNode1.operDataStore(), null);
1118 RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
1119 new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
1120 FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1121 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1122 successShardResult("people", DataStoreType.Config),
1123 successShardResult("cars", DataStoreType.Operational),
1124 successShardResult("people", DataStoreType.Operational));
1126 // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1127 verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1128 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1129 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1130 new String[]{"cars", "people"},
1131 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1132 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1133 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1135 // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1136 // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1137 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1138 assertNotNull("Expected non-null leader Id", raftState.getLeader());
1139 assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1143 private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig,
1144 final String member, final String datastoreTypeSuffix, final String... shards) {
1145 String[] datastoreTypes = {"config_", "oper_"};
1146 for (String type : datastoreTypes) {
1147 for (String shard : shards) {
1148 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1149 for (ServerInfo info : serverConfig.getServerConfig()) {
1150 newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1151 type + datastoreTypeSuffix).toString(), info.isVoting()));
1154 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1155 type + datastoreTypeSuffix).toString();
1156 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1157 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
1158 new ServerConfigurationPayload(newServerInfo)));
1164 private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards,
1165 final SimpleEntry<String, Boolean>... expStates) throws Exception {
1166 for (AbstractDataStore datastore: datastores) {
1167 for (String shard: shards) {
1168 verifyVotingStates(datastore, shard, expStates);
1174 private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName,
1175 final SimpleEntry<String, Boolean>... expStates) throws Exception {
1176 String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName();
1177 Map<String, Boolean> expStateMap = new HashMap<>();
1178 for (Entry<String, Boolean> e: expStates) {
1179 expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1180 datastore.getActorUtils().getDataStoreName()).toString(), e.getValue());
1183 verifyRaftState(datastore, shardName, raftState -> {
1184 String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1185 datastore.getActorUtils().getDataStoreName()).toString();
1186 assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1187 for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1188 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1193 private static void verifyShardResults(final List<ShardResult> shardResults, final ShardResult... expShardResults) {
1194 Map<String, ShardResult> expResultsMap = new HashMap<>();
1195 for (ShardResult r: expShardResults) {
1196 expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1199 for (ShardResult result: shardResults) {
1200 ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1201 assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1202 result.getDataStoreType()), exp);
1203 assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1204 if (exp.isSucceeded()) {
1205 assertNull("Expected null error message", result.getErrorMessage());
1207 assertNotNull("Expected error message", result.getErrorMessage());
1211 if (!expResultsMap.isEmpty()) {
1212 fail("Missing shard results for " + expResultsMap.keySet());
1216 private static ShardResult successShardResult(final String shardName, final DataStoreType type) {
1217 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1220 private static ShardResult failedShardResult(final String shardName, final DataStoreType type) {
1221 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();