X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcServiceTest.java;h=2239908877b01d91b2e02b3894cabeb796e6f1a3;hb=HEAD;hp=0ba06e3bcd694648cbc3a6f0165582ce4ebc1ee9;hpb=4b59df006c79ffb8119152e5a8bc6aadd276c031;p=controller.git diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 0ba06e3bcd..2239908877 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.admin; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; +import static java.util.Objects.requireNonNull; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; @@ -26,23 +27,14 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Status.Success; import akka.cluster.Cluster; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import java.io.File; -import java.io.FileInputStream; -import java.net.URI; -import java.util.AbstractMap.SimpleEntry; +import java.nio.file.Files; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -50,19 +42,17 @@ import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; @@ -70,55 +60,24 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; -import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; -import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.common.XMLNamespace; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; /** * Unit tests for ClusterAdminRpcService. @@ -126,6 +85,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * @author Thomas Pantelis */ public class ClusterAdminRpcServiceTest { + record ExpState(String name, boolean voting) { + ExpState { + requireNonNull(name); + } + } + private static final MemberName MEMBER_1 = MemberName.forName("member-1"); private static final MemberName MEMBER_2 = MemberName.forName("member-2"); private static final MemberName MEMBER_3 = MemberName.forName("member-3"); @@ -139,33 +104,38 @@ public class ClusterAdminRpcServiceTest { @After public void tearDown() { - for (MemberNode m : Lists.reverse(memberNodes)) { - m.cleanup(); + for (var member : Lists.reverse(memberNodes)) { + member.cleanup(); } memberNodes.clear(); } @Test public void testBackupDatastore() throws Exception { - MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1") - .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people") - .testName("testBackupDatastore").build(); + final var node = MemberNode.builder(memberNodes) + .akkaConfig("Member1") + .moduleShardsConfig("module-shards-member1.conf") + .waitForShardLeader("cars", "people") + .testName("testBackupDatastore") + .build(); - String fileName = "target/testBackupDatastore"; - new File(fileName).delete(); + final var fileName = "target/testBackupDatastore"; + final var file = new File(fileName); + file.delete(); - ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null); + final var service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null); - RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder() - .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); + var rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()) + .get(5, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - try (FileInputStream fis = new FileInputStream(fileName)) { - List snapshots = SerializationUtils.deserialize(fis); + try (var fis = Files.newInputStream(file.toPath())) { + final List snapshots = SerializationUtils.deserialize(fis); assertEquals("DatastoreSnapshot size", 2, snapshots.size()); - ImmutableMap map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0), - snapshots.get(1).getType(), snapshots.get(1)); + final var map = Map.of( + snapshots.get(0).getType(), snapshots.get(0), + snapshots.get(1).getType(), snapshots.get(1)); verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(), map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people"); } finally { @@ -177,7 +147,7 @@ public class ClusterAdminRpcServiceTest { node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder() .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender()); - ActorRef carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").get(); + final var carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").orElseThrow(); node.kit().watch(carsShardActor); carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); node.kit().expectTerminated(carsShardActor); @@ -191,113 +161,12 @@ public class ClusterAdminRpcServiceTest { private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot, final String... expShardNames) { assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot); - Set shardNames = new HashSet<>(); - for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) { - shardNames.add(s.getName()); + var shardNames = new HashSet(); + for (var snapshot : datastoreSnapshot.getShardSnapshots()) { + shardNames.add(snapshot.getName()); } - assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames); - } - - @Test - public void testAddRemovePrefixShardReplica() throws Exception { - String name = "testAddPrefixShardReplica"; - String moduleShardsConfig = "module-shards-default.conf"; - - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - - member1.waitForMembersUp("member-2", "member-3"); - replicaNode2.kit().waitForMembersUp("member-1", "member-3"); - replicaNode3.kit().waitForMembersUp("member-1", "member-2"); - - final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager(); - - shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), - "prefix", Collections.singleton(MEMBER_1))), - ActorRef.noSender()); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); - final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); - Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); - - addPrefixShardReplica(replicaNode2, identifier, serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1"); - - addPrefixShardReplica(replicaNode3, identifier, serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2"); - - verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), - "member-2", "member-3"); - - removePrefixShardReplica(member1, identifier, "member-3", serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2"); - - verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), - "member-1"); - - removePrefixShardReplica(member1, identifier, "member-2", serializer, - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - } - - @Test - public void testGetShardRole() throws Exception { - String name = "testGetShardRole"; - String moduleShardsConfig = "module-shards-default-member-1.conf"; - - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).build(); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default"); - - final RpcResult successResult = - getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default"); - verifySuccessfulRpcResult(successResult); - assertEquals("Leader", successResult.getResult().getRole()); - - final RpcResult failedResult = - getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars"); - - verifyFailedRpcResult(failedResult); - - final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager(); - - shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH), - "prefix", Collections.singleton(MEMBER_1))), - ActorRef.noSender()); - - member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), - ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); - - final InstanceIdentifier identifier = InstanceIdentifier.create(Cars.class); - final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class); - Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier); - - final RpcResult prefixSuccessResult = - getPrefixShardRole(member1, identifier, serializer); - - verifySuccessfulRpcResult(prefixSuccessResult); - assertEquals("Leader", prefixSuccessResult.getResult().getRole()); - - final InstanceIdentifier peopleId = InstanceIdentifier.create(People.class); - Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId); - - final RpcResult prefixFail = - getPrefixShardRole(member1, peopleId, serializer); - - verifyFailedRpcResult(prefixFail); + assertEquals("DatastoreSnapshot shard names", Set.of(expShardNames), shardNames); } @Test @@ -305,12 +174,10 @@ public class ClusterAdminRpcServiceTest { String name = "testGetPrefixShardRole"; String moduleShardsConfig = "module-shards-default-member-1.conf"; - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default"); - - } @Test @@ -318,11 +185,11 @@ public class ClusterAdminRpcServiceTest { String name = "testModuleShardLeaderMovement"; String moduleShardsConfig = "module-shards-member1.conf"; - final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); member1.waitForMembersUp("member-2", "member-3"); @@ -358,17 +225,17 @@ public class ClusterAdminRpcServiceTest { public void testAddShardReplica() throws Exception { String name = "testAddShardReplica"; String moduleShardsConfig = "module-shards-cars-member-1.conf"; - MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build(); - MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.waitForMembersUp("member-2"); doAddShardReplica(newReplicaNode2, "cars", "member-1"); - MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + var newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.waitForMembersUp("member-3"); @@ -380,18 +247,18 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3"); // Write data to member-2's config datastore and read/verify via member-3 - final NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(), + final var configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(), newReplicaNode3.configDataStore()); // Write data to member-3's oper datastore and read/verify via member-2 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore()); // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 - - // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload. + // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload. RaftStateVerifier verifier = raftState -> { - assertEquals("Commit index", 4, raftState.getCommitIndex()); - assertEquals("Last applied index", 4, raftState.getLastApplied()); + assertEquals("Commit index", 3, raftState.getCommitIndex()); + assertEquals("Last applied index", 3, raftState.getLastApplied()); }; verifyRaftState(leaderNode1.configDataStore(), "cars", verifier); @@ -418,34 +285,36 @@ public class ClusterAdminRpcServiceTest { @Test public void testAddShardReplicaFailures() throws Exception { String name = "testAddShardReplicaFailures"; - MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig("module-shards-cars-member-1.conf").build(); - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore(), null); + final var service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() - .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + var rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() + .setDataStoreType(DataStoreType.Config) + .build()) + .get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); - rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars") - .build()).get(10, TimeUnit.SECONDS); + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars").build()) + .get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people") - .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + .setDataStoreType(DataStoreType.Config) + .build()) + .get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); } - private static NormalizedNode writeCarsNodeAndVerify(final AbstractDataStore writeToStore, + private static ContainerNode writeCarsNodeAndVerify(final AbstractDataStore writeToStore, final AbstractDataStore readFromStore) throws Exception { - DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction(); - NormalizedNode carsNode = CarsModel.create(); + final var writeTx = writeToStore.newWriteOnlyTransaction(); + final var carsNode = CarsModel.create(); writeTx.write(CarsModel.BASE_PATH, carsNode); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS); - assertEquals("canCommit", TRUE, canCommit); + final var cohort = writeTx.ready(); + assertEquals("canCommit", TRUE, cohort.canCommit().get(7, TimeUnit.SECONDS)); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -454,99 +323,31 @@ public class ClusterAdminRpcServiceTest { } private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore, - final NormalizedNode expCarsNode) throws Exception { - Optional> optional = readFromStore.newReadOnlyTransaction().read(CarsModel.BASE_PATH) - .get(15, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", expCarsNode, optional.get()); - } - - private static RpcResult getShardRole(final MemberNode memberNode, - final BindingNormalizedNodeSerializer serializer, final String shardName) throws Exception { - - final GetShardRoleInput input = new GetShardRoleInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardName(shardName) - .build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - return service.getShardRole(input).get(10, TimeUnit.SECONDS); - } - - private static RpcResult getPrefixShardRole( - final MemberNode memberNode, - final InstanceIdentifier identifier, - final BindingNormalizedNodeSerializer serializer) throws Exception { - - final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardPrefix(identifier) - .build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS); - } - - private static void addPrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier identifier, - final BindingNormalizedNodeSerializer serializer, final String shardName, - final String... peerMemberNames) throws Exception { - - final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder() - .setShardPrefix(identifier) - .setDataStoreType(DataStoreType.Config).build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - final RpcResult rpcResult = service.addPrefixShardReplica(input) - .get(10, TimeUnit.SECONDS); - verifySuccessfulRpcResult(rpcResult); - - verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - Optional optional = memberNode.configDataStore().getActorUtils().findLocalShard(shardName); - assertTrue("Replica shard not present", optional.isPresent()); - } - - private static void removePrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier identifier, - final String removeFromMember, final BindingNormalizedNodeSerializer serializer, final String shardName, - final String... peerMemberNames) throws Exception { - final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder() - .setDataStoreType(DataStoreType.Config) - .setShardPrefix(identifier) - .setMemberName(removeFromMember).build(); - - final ClusterAdminRpcService service = - new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer); - - final RpcResult rpcResult = service.removePrefixShardReplica(input) - .get(10, TimeUnit.SECONDS); - verifySuccessfulRpcResult(rpcResult); - - verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); + final ContainerNode expCarsNode) throws Exception { + assertEquals(Optional.of(expCarsNode), + readFromStore.newReadOnlyTransaction().read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS)); } private static void doAddShardReplica(final MemberNode memberNode, final String shardName, final String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore(), null); + final var service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() - .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + var rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() + .setShardName(shardName) + .setDataStoreType(DataStoreType.Config) + .build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); - Optional optional = memberNode.operDataStore().getActorUtils().findLocalShard(shardName); - assertFalse("Oper shard present", optional.isPresent()); + assertEquals(Optional.empty(), memberNode.operDataStore().getActorUtils().findLocalShard(shardName)); - rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName) - .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS); + rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() + .setShardName(shardName) + .setDataStoreType(DataStoreType.Operational) + .build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); @@ -554,12 +355,12 @@ public class ClusterAdminRpcServiceTest { private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName, final String newLeader) throws Exception { - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore(), null); + final var service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null); - final RpcResult rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder() - .setDataStoreType(DataStoreType.Config).setShardName(shardName).build()) - .get(10, TimeUnit.SECONDS); + final var rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder() + .setDataStoreType(DataStoreType.Config) + .setShardName(shardName) + .build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); @@ -569,8 +370,9 @@ public class ClusterAdminRpcServiceTest { private static T verifySuccessfulRpcResult(final RpcResult rpcResult) { if (!rpcResult.isSuccessful()) { - if (rpcResult.getErrors().size() > 0) { - RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); + final var errors = rpcResult.getErrors(); + if (errors.size() > 0) { + final var error = errors.get(0); throw new AssertionError("Rpc failed with error: " + error, error.getCause()); } @@ -582,8 +384,9 @@ public class ClusterAdminRpcServiceTest { private static void verifyFailedRpcResult(final RpcResult rpcResult) { assertFalse("RpcResult", rpcResult.isSuccessful()); - assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size()); - RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); + final var errors = rpcResult.getErrors(); + assertEquals("RpcResult errors size", 1, errors.size()); + final var error = errors.get(0); assertNotNull("RpcResult error message null", error.getMessage()); } @@ -591,15 +394,15 @@ public class ClusterAdminRpcServiceTest { public void testRemoveShardReplica() throws Exception { String name = "testRemoveShardReplica"; String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) .build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.configDataStore().waitTillReady(); @@ -610,12 +413,13 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to remove it's local shard - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), + null); - RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() - .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()) - .get(10, TimeUnit.SECONDS); + var rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() + .setShardName("cars").setMemberName("member-3") + .setDataStoreType(DataStoreType.Config) + .build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2"); @@ -627,7 +431,7 @@ public class ClusterAdminRpcServiceTest { Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress()); replicaNode2.cleanup(); - MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); newPeplicaNode2.configDataStore().waitTillReady(); @@ -635,11 +439,14 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-1 to remove member-2 - ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore(), null); + final var service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(), + null); - rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars") - .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); + rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() + .setShardName("cars") + .setMemberName("member-2") + .setDataStoreType(DataStoreType.Config) + .build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars"); @@ -650,15 +457,15 @@ public class ClusterAdminRpcServiceTest { public void testRemoveShardLeaderReplica() throws Exception { String name = "testRemoveShardLeaderReplica"; String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) .build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.configDataStore().waitTillReady(); @@ -671,12 +478,14 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on leader member-1 to remove it's local shard - ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore(), null); + final var service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(), + null); - RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() - .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()) - .get(10, TimeUnit.SECONDS); + final var rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() + .setShardName("cars") + .setMemberName("member-1") + .setDataStoreType(DataStoreType.Config) + .build()).get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); verifyRaftState(replicaNode2.configDataStore(), "cars", raftState -> @@ -692,18 +501,17 @@ public class ClusterAdminRpcServiceTest { public void testAddReplicasForAllShards() throws Exception { String name = "testAddReplicasForAllShards"; String moduleShardsConfig = "module-shards-member1.conf"; - MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build(); - ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", - "pets", null, - Collections.singletonList(MEMBER_1)); + final var petsModuleConfig = new ModuleShardConfiguration(XMLNamespace.of("pets-ns"), "pets-module", "pets", + null, List.of(MEMBER_1)); leaderNode1.configDataStore().getActorUtils().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); leaderNode1.kit().expectMsgClass(Success.class); leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets"); - MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.waitForMembersUp("member-2"); @@ -713,20 +521,18 @@ public class ClusterAdminRpcServiceTest { new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef()); newReplicaNode2.kit().expectMsgClass(Success.class); - newReplicaNode2.operDataStore().getActorUtils().getShardManager().tell( - new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module", - "no-leader", null, - Collections.singletonList(MEMBER_1)), - Shard.builder(), null), - newReplicaNode2.kit().getRef()); + newReplicaNode2.operDataStore().getActorUtils().getShardManager() + .tell(new CreateShard(new ModuleShardConfiguration(XMLNamespace.of("no-leader-ns"), "no-leader-module", + "no-leader", null, List.of(MEMBER_1)), + Shard.builder(), null), newReplicaNode2.kit().getRef()); newReplicaNode2.kit().expectMsgClass(Success.class); - ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), - newReplicaNode2.operDataStore(), null); + final var service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), + newReplicaNode2.operDataStore(), null); - RpcResult rpcResult = service.addReplicasForAllShards( - new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); - AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + var rpcResult = service.addReplicasForAllShards(new AddReplicasForAllShardsInputBuilder().build()) + .get(10, TimeUnit.SECONDS); + final var result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), successShardResult("pets", DataStoreType.Config), @@ -745,15 +551,15 @@ public class ClusterAdminRpcServiceTest { public void testRemoveAllShardReplicas() throws Exception { String name = "testRemoveAllShardReplicas"; String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) .build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.configDataStore().waitTillReady(); @@ -761,8 +567,8 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); - ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", - "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3)); + final var petsModuleConfig = new ModuleShardConfiguration(XMLNamespace.of("pets-ns"), "pets-module", "pets", + null, List.of(MEMBER_1, MEMBER_2, MEMBER_3)); leaderNode1.configDataStore().getActorUtils().getShardManager().tell( new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); leaderNode1.kit().expectMsgClass(Success.class); @@ -779,12 +585,13 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3"); verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2"); - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), + null); - RpcResult rpcResult = service3.removeAllShardReplicas( - new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS); - RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult); + var rpcResult = service3.removeAllShardReplicas( + new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()) + .get(10, TimeUnit.SECONDS); + final var result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), successShardResult("pets", DataStoreType.Config), @@ -806,15 +613,15 @@ public class ClusterAdminRpcServiceTest { public void testChangeMemberVotingStatesForShard() throws Exception { String name = "testChangeMemberVotingStatusForShard"; String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) .build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.configDataStore().waitTillReady(); @@ -825,32 +632,31 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), + null); - RpcResult rpcResult = service3 - .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() - .setShardName("cars").setDataStoreType(DataStoreType.Config) - .setMemberVotingState(ImmutableList.of( - new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(), - new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())) - .build()) - .get(10, TimeUnit.SECONDS); + var rpcResult = service3.changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() + .setShardName("cars").setDataStoreType(DataStoreType.Config) + .setMemberVotingState(List.of( + new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(), + new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())) + .build()) + .get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE), - new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE)); - verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE), - new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE)); - verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE), - new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE)); + verifyVotingStates(leaderNode1.configDataStore(), "cars", + new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false)); + verifyVotingStates(replicaNode2.configDataStore(), "cars", + new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false)); + verifyVotingStates(replicaNode3.configDataStore(), "cars", + new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false)); } @Test public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception { String name = "testChangeMemberVotingStatesForSingleNodeShard"; String moduleShardsConfig = "module-shards-member1.conf"; - MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) .build(); @@ -859,34 +665,39 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status - ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), - leaderNode.operDataStore(), null); - - RpcResult rpcResult = service - .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() - .setShardName("cars").setDataStoreType(DataStoreType.Config) - .setMemberVotingState(ImmutableList - .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build())) - .build()) - .get(10, TimeUnit.SECONDS); + final var service = new ClusterAdminRpcService(leaderNode.configDataStore(), leaderNode.operDataStore(), null); + + final var rpcResult = service.changeMemberVotingStatesForShard( + new ChangeMemberVotingStatesForShardInputBuilder() + .setShardName("cars").setDataStoreType(DataStoreType.Config) + .setMemberVotingState(List.of(new MemberVotingStateBuilder() + .setMemberName("member-1") + .setVoting(FALSE) + .build())) + .build()) + .get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); - verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE)); + verifyVotingStates(leaderNode.configDataStore(), "cars", new ExpState("member-1", true)); } @Test public void testChangeMemberVotingStatesForAllShards() throws Exception { String name = "testChangeMemberVotingStatesForAllShards"; String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) - .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) - .build(); - - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes) + .akkaConfig("Member1") + .testName(name) + .moduleShardsConfig(moduleShardsConfig) + .datastoreContextBuilder(DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(300) + .shardElectionTimeoutFactor(1)) + .build(); + + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.configDataStore().waitTillReady(); @@ -899,75 +710,78 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), null); - RpcResult rpcResult = service3.changeMemberVotingStatesForAllShards( - new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of( + final var rpcResult = service3.changeMemberVotingStatesForAllShards( + new ChangeMemberVotingStatesForAllShardsInputBuilder() + .setMemberVotingState(List.of( new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(), - new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build()) + new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())) + .build()) .get(10, TimeUnit.SECONDS); - ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + final var result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), - replicaNode2.configDataStore(), replicaNode2.operDataStore(), - replicaNode3.configDataStore(), replicaNode3.operDataStore()}, - new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE), - new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE)); + verifyVotingStates(new ClientBackedDataStore[] { + leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore() + }, new String[] { "cars", "people" }, + new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false)); } @Test public void testFlipMemberVotingStates() throws Exception { String name = "testFlipMemberVotingStates"; - ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( - new ServerInfo("member-1", true), new ServerInfo("member-2", true), - new ServerInfo("member-3", false))); + final var persistedServerConfig = new ServerConfigurationPayload(List.of( + new ServerInfo("member-1", true), new ServerInfo("member-2", true), new ServerInfo("member-3", false))); setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people"); setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people"); setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder() .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10)) .build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.configDataStore().waitTillReady(); leaderNode1.operDataStore().waitTillReady(); replicaNode3.configDataStore().waitTillReady(); replicaNode3.operDataStore().waitTillReady(); - verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE), - new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE)); + verifyVotingStates(leaderNode1.configDataStore(), "cars", + new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", false)); - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(), + null); - RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards( - new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); - FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + var rpcResult = service3.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()) + .get(10, TimeUnit.SECONDS); + var result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), - replicaNode2.configDataStore(), replicaNode2.operDataStore(), - replicaNode3.configDataStore(), replicaNode3.operDataStore()}, - new String[]{"cars", "people"}, - new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE), - new SimpleEntry<>("member-3", TRUE)); + verifyVotingStates(new ClientBackedDataStore[] { + leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore() + }, new String[] { "cars", "people" }, + new ExpState("member-1", false), new ExpState("member-2", false), new ExpState("member-3", true)); // Leadership should have transferred to member 3 since it is the only remaining voting member. verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { @@ -985,19 +799,20 @@ public class ClusterAdminRpcServiceTest { // Flip the voting states back to the original states. rpcResult = service3.flipMemberVotingStatesForAllShards( - new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); + new FlipMemberVotingStatesForAllShardsInputBuilder().build()) + .get(10, TimeUnit.SECONDS); result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), - replicaNode2.configDataStore(), replicaNode2.operDataStore(), - replicaNode3.configDataStore(), replicaNode3.operDataStore()}, - new String[]{"cars", "people"}, - new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE), - new SimpleEntry<>("member-3", FALSE)); + verifyVotingStates(new ClientBackedDataStore[] { + leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore() + }, new String[] { "cars", "people" }, + new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", false)); // Leadership should have transferred to member 1 or 2. verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { @@ -1013,7 +828,7 @@ public class ClusterAdminRpcServiceTest { // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially // non-voting and simulated as down by not starting them up. - ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + final var persistedServerConfig = new ServerConfigurationPayload(List.of( new ServerInfo("member-1", false), new ServerInfo("member-2", false), new ServerInfo("member-3", false), new ServerInfo("member-4", true), new ServerInfo("member-5", true), new ServerInfo("member-6", true))); @@ -1023,47 +838,47 @@ public class ClusterAdminRpcServiceTest { setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) .build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); // Initially there won't be a leader b/c all the up nodes are non-voting. replicaNode1.waitForMembersUp("member-2", "member-3"); - verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE), - new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE), - new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE), - new SimpleEntry<>("member-6", TRUE)); + verifyVotingStates(replicaNode1.configDataStore(), "cars", + new ExpState("member-1", false), new ExpState("member-2", false), new ExpState("member-3", false), + new ExpState("member-4", true), new ExpState("member-5", true), new ExpState("member-6", true)); verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState())); - ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), - replicaNode1.operDataStore(), null); + final var service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), replicaNode1.operDataStore(), + null); - RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( - new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); - FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + final var rpcResult = service1.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()) + .get(10, TimeUnit.SECONDS); + final var result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); - verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(), - replicaNode2.configDataStore(), replicaNode2.operDataStore(), - replicaNode3.configDataStore(), replicaNode3.operDataStore()}, - new String[]{"cars", "people"}, - new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE), - new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE), - new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE)); + verifyVotingStates(new ClientBackedDataStore[] { + replicaNode1.configDataStore(), replicaNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore() + }, new String[] { "cars", "people" }, + new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", true), + new ExpState("member-4", false), new ExpState("member-5", false), new ExpState("member-6", false)); // Since member 1 was changed to voting and there was no leader, it should've started and election // and become leader @@ -1085,7 +900,7 @@ public class ClusterAdminRpcServiceTest { String name = "testFlipMemberVotingStatesWithVotingMembersDown"; // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up. - ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + final var persistedServerConfig = new ServerConfigurationPayload(List.of( new ServerInfo("member-1", true), new ServerInfo("member-2", true), new ServerInfo("member-3", true), new ServerInfo("member-4", false), new ServerInfo("member-5", false), new ServerInfo("member-6", false))); @@ -1095,43 +910,43 @@ public class ClusterAdminRpcServiceTest { setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; - final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder( DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)) .build(); - final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); - final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) .moduleShardsConfig(moduleShardsConfig).build(); leaderNode1.configDataStore().waitTillReady(); leaderNode1.operDataStore().waitTillReady(); - verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE), - new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE), - new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE), - new SimpleEntry<>("member-6", FALSE)); + verifyVotingStates(leaderNode1.configDataStore(), "cars", + new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", true), + new ExpState("member-4", false), new ExpState("member-5", false), new ExpState("member-6", false)); - ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore(), null); + final var service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(), + null); - RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( - new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); - FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + final var rpcResult = service1.flipMemberVotingStatesForAllShards( + new FlipMemberVotingStatesForAllShardsInputBuilder().build()) + .get(10, TimeUnit.SECONDS); + final var result = verifySuccessfulRpcResult(rpcResult); verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), successShardResult("people", DataStoreType.Config), successShardResult("cars", DataStoreType.Operational), successShardResult("people", DataStoreType.Operational)); // Members 2 and 3 are now non-voting but should get replicated with the new new server config. - verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), - replicaNode2.configDataStore(), replicaNode2.operDataStore(), - replicaNode3.configDataStore(), replicaNode3.operDataStore()}, - new String[]{"cars", "people"}, - new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE), - new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE), - new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE)); + verifyVotingStates(new ClientBackedDataStore[] { + leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore() + }, new String[] { "cars", "people" }, + new ExpState("member-1", false), new ExpState("member-2", false), new ExpState("member-3", false), + new ExpState("member-4", true), new ExpState("member-5", true), new ExpState("member-6", true)); // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader. @@ -1143,16 +958,16 @@ public class ClusterAdminRpcServiceTest { private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig, final String member, final String datastoreTypeSuffix, final String... shards) { - String[] datastoreTypes = {"config_", "oper_"}; + String[] datastoreTypes = { "config_", "oper_" }; for (String type : datastoreTypes) { for (String shard : shards) { - List newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size()); - for (ServerInfo info : serverConfig.getServerConfig()) { - newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()), + final var newServerInfo = new ArrayList(serverConfig.getServerConfig().size()); + for (var info : serverConfig.getServerConfig()) { + newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.peerId()), type + datastoreTypeSuffix).toString(), info.isVoting())); } - String shardID = ShardIdentifier.create(shard, MemberName.forName(member), + final String shardID = ShardIdentifier.create(shard, MemberName.forName(member), type + datastoreTypeSuffix).toString(); InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null)); InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1, @@ -1161,49 +976,47 @@ public class ClusterAdminRpcServiceTest { } } - @SafeVarargs - private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards, - final SimpleEntry... expStates) throws Exception { - for (AbstractDataStore datastore: datastores) { - for (String shard: shards) { + private static void verifyVotingStates(final ClientBackedDataStore[] datastores, final String[] shards, + final ExpState... expStates) throws Exception { + for (var datastore : datastores) { + for (String shard : shards) { verifyVotingStates(datastore, shard, expStates); } } } - @SafeVarargs - private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName, - final SimpleEntry... expStates) throws Exception { + private static void verifyVotingStates(final ClientBackedDataStore datastore, final String shardName, + final ExpState... expStates) throws Exception { String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName(); - Map expStateMap = new HashMap<>(); - for (Entry e: expStates) { - expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()), - datastore.getActorUtils().getDataStoreName()).toString(), e.getValue()); + var expStateMap = new HashMap(); + for (var expState : expStates) { + expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(expState.name), + datastore.getActorUtils().getDataStoreName()).toString(), expState.voting); } verifyRaftState(datastore, shardName, raftState -> { String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName), datastore.getActorUtils().getDataStoreName()).toString(); assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting()); - for (Entry e: raftState.getPeerVotingStates().entrySet()) { - assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue()); + for (var entry : raftState.getPeerVotingStates().entrySet()) { + assertEquals("Voting state for " + entry.getKey(), expStateMap.get(entry.getKey()), entry.getValue()); } }); } private static void verifyShardResults(final Map shardResults, final ShardResult... expShardResults) { - Map expResultsMap = new HashMap<>(); - for (ShardResult r: expShardResults) { + var expResultsMap = new HashMap(); + for (var r : expShardResults) { expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r); } - for (ShardResult result: shardResults.values()) { - ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType()); + for (var result : shardResults.values()) { + var exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType()); assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(), result.getDataStoreType()), exp); - assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded()); - if (exp.isSucceeded()) { + assertEquals("isSucceeded", exp.getSucceeded(), result.getSucceeded()); + if (exp.getSucceeded()) { assertNull("Expected null error message", result.getErrorMessage()); } else { assertNotNull("Expected error message", result.getErrorMessage());