From: Tom Pantelis Date: Tue, 8 Dec 2015 13:38:47 +0000 (-0500) Subject: Bug 2187: Implement add-replicas-for-all-shards RPC X-Git-Tag: release/beryllium~86 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=01be99539d7b19743a237b6e72d2d870491daf7a Bug 2187: Implement add-replicas-for-all-shards RPC Implemented the add-replicas-for-all-shards RPC. The yang RPC definition was changed to return a list of results for each shard. In the unit test I added a shard config dynamically to the new replica member that doesn't exist in the leader to test a replica failure case. This revealed an issue for FindPrimary where both member nodes forwarded the RemoteFindPrimary message back and forth to each other as neither had a local replica. To fix this, I added a visitedAddresses set to the RemoteFindPrimary message to prevent the endless mutual recursion. Change-Id: Icb0329db2c935f9825b81f593b83bdab13fa6b52 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index d61e12e1cb..c6d7d82c34 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -797,12 +797,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + Collection visitedAddresses; + if(message instanceof RemoteFindPrimary) { + visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses(); + } else { + visitedAddresses = new ArrayList<>(); + } + + visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString()); + for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) { + if(visitedAddresses.contains(address)) { + continue; + } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), shardName, address); getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName, - message.isWaitUntilReady()), getContext()); + message.isWaitUntilReady(), visitedAddresses), getContext()); return; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java index 464fc7f53a..40596f5037 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java @@ -74,7 +74,7 @@ class ShardPeerAddressResolver implements PeerAddressResolver { return null; } - private StringBuilder getShardManagerActorPathBuilder(Address address) { + StringBuilder getShardManagerActorPathBuilder(Address address) { return new StringBuilder().append(address.toString()).append("/user/").append(shardManagerIdentifier); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 007b347558..976443fe20 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -12,13 +12,19 @@ import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.io.FileOutputStream; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; @@ -26,9 +32,12 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; @@ -36,6 +45,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -48,6 +59,8 @@ import org.slf4j.LoggerFactory; * @author Thomas Pantelis */ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable { + private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); + private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); private final DistributedDataStore configDataStore; @@ -113,12 +126,29 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } @Override - public Future> addReplicasForAllShards() { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> addReplicasForAllShards() { + LOG.info("Adding replicas for all shards"); + + final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); + Function messageSupplier = new Function() { + @Override + public Object apply(String shardName) { + return new AddShardReplica(shardName); + } + }; + + sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); + sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); + + return waitForShardResults(shardResultData, new Function, AddReplicasForAllShardsOutput>() { + @Override + public AddReplicasForAllShardsOutput apply(List shardResults) { + return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(); + } + }, "Failed to add replica"); } + @Override public Future> removeAllShardReplicas() { // TODO implement @@ -166,9 +196,68 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return returnFuture; } + private static SettableFuture> waitForShardResults( + final List, ShardResultBuilder>> shardResultData, + final Function, T> resultDataSupplier, + final String failureLogMsgPrefix) { + final SettableFuture> returnFuture = SettableFuture.create(); + final List shardResults = new ArrayList<>(); + for(final Entry, ShardResultBuilder> entry: shardResultData) { + Futures.addCallback(entry.getKey(), new FutureCallback() { + @Override + public void onSuccess(Success result) { + synchronized(shardResults) { + ShardResultBuilder shardResult = entry.getValue(); + LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(), + shardResult.getDataStoreType()); + shardResults.add(shardResult.setSucceeded(true).build()); + checkIfComplete(); + } + } + + @Override + public void onFailure(Throwable t) { + synchronized(shardResults) { + ShardResultBuilder shardResult = entry.getValue(); + LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(), + shardResult.getDataStoreType(), t); + shardResults.add(shardResult.setSucceeded(false).setErrorMessage( + Throwables.getRootCause(t).getMessage()).build()); + checkIfComplete(); + } + } + + void checkIfComplete() { + LOG.debug("checkIfComplete: expected {}, actual {}", shardResultData.size(), shardResults.size()); + if(shardResults.size() == shardResultData.size()) { + returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults))); + } + } + }); + } + return returnFuture; + } + + private void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType, + List, ShardResultBuilder>> shardResultData, + Function messageSupplier) { + ActorContext actorContext = dataStoreType == DataStoreType.Config ? + configDataStore.getActorContext() : operDataStore.getActorContext(); + Set allShardNames = actorContext.getConfiguration().getAllShardNames(); + + LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreType()); + + for(String shardName: allShardNames) { + ListenableFuture future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName), + SHARD_MGR_TIMEOUT); + shardResultData.add(new SimpleEntry, ShardResultBuilder>(future, + new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType))); + } + } + @SuppressWarnings("unchecked") private ListenableFuture> sendMessageToShardManagers(Object message) { - Timeout timeout = new Timeout(1, TimeUnit.MINUTES); + Timeout timeout = SHARD_MGR_TIMEOUT; ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout); ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout); @@ -178,7 +267,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl private ListenableFuture sendMessageToShardManager(DataStoreType dataStoreType, Object message) { ActorRef shardManager = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager(); - return ask(shardManager, message, new Timeout(1, TimeUnit.MINUTES)); + return ask(shardManager, message, SHARD_MGR_TIMEOUT); } private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName, @@ -232,6 +321,10 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } private static RpcResult newSuccessfulResult() { - return RpcResultBuilder.success().build(); + return newSuccessfulResult((Void)null); + } + + private static RpcResult newSuccessfulResult(T data) { + return RpcResultBuilder.success(data).build(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java index 820512e096..041085fe15 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java @@ -7,6 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import javax.annotation.Nonnull; + /** * A remote message sent to locate the primary shard. * @@ -15,7 +21,15 @@ package org.opendaylight.controller.cluster.datastore.messages; public class RemoteFindPrimary extends FindPrimary { private static final long serialVersionUID = 1L; - public RemoteFindPrimary(String shardName, boolean waitUntilReady) { + private final Set visitedAddresses; + + public RemoteFindPrimary(String shardName, boolean waitUntilReady, @Nonnull Collection visitedAddresses) { super(shardName, waitUntilReady); + this.visitedAddresses = new HashSet<>(Preconditions.checkNotNull(visitedAddresses)); + } + + @Nonnull + public Set getVisitedAddresses() { + return visitedAddresses; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang index 940741b720..a532895098 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang @@ -21,13 +21,31 @@ module cluster-admin { } } + grouping shard-operation-result { + leaf shard-name { + type string; + } + + leaf data-store-type { + type data-store-type; + } + + leaf succeeded { + type boolean; + } + + leaf error-message { + type string; + } + } + rpc add-shard-replica { input { leaf shard-name { type string; description "The name of the shard for which to create a replica."; } - + leaf data-store-type { type data-store-type; description "The type of the data store to which the replica belongs"; @@ -64,6 +82,16 @@ module cluster-admin { } rpc add-replicas-for-all-shards { + output { + list shard-result { + key "shard-name"; + key "data-store-type"; + uses shard-operation-result; + + description "The list of results, one per shard"; + } + } + description "Adds replicas on this node for all currently defined shards. This is equivalent to issuing an add-shard-replica RPC for all shards."; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java index 33be6bee02..e966c95298 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java @@ -144,7 +144,8 @@ public class MemberNode { verifyRaftState(datastore, shardName, new RaftStateVerifier() { @Override public void verify(OnDemandRaftState raftState) { - assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName, + assertTrue(String.format("Peer(s) %s not found for shard %s. Actual: %s", peerIds, shardName, + raftState.getPeerAddresses().keySet()), raftState.getPeerAddresses().keySet().containsAll(peerIds)); } }); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 765d8a1033..e117dcc4b6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -9,11 +9,13 @@ package org.opendaylight.controller.cluster.datastore.admin; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState; import akka.actor.ActorRef; import akka.actor.PoisonPill; +import akka.actor.Status.Success; import akka.cluster.Cluster; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; @@ -21,9 +23,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.io.File; import java.io.FileInputStream; +import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; @@ -33,6 +39,9 @@ import org.junit.Test; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; +import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -40,9 +49,12 @@ import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -260,7 +272,7 @@ public class ClusterAdminRpcServiceTest { service.close(); } - private void verifySuccessfulRpcResult(RpcResult rpcResult) { + private T verifySuccessfulRpcResult(RpcResult rpcResult) { if(!rpcResult.isSuccessful()) { if(rpcResult.getErrors().size() > 0) { RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); @@ -269,6 +281,8 @@ public class ClusterAdminRpcServiceTest { fail("Rpc failed with no error"); } + + return rpcResult.getResult(); } private void verifyFailedRpcResult(RpcResult rpcResult) { @@ -284,8 +298,54 @@ public class ClusterAdminRpcServiceTest { } @Test - public void testAddReplicasForAllShards() { - // TODO implement + public void testAddReplicasForAllShards() throws Exception { + String name = "testAddReplicasForAllShards"; + String moduleShardsConfig = "module-shards-member1.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build(); + + ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module", + "pets", null, Arrays.asList("member-1")); + leaderNode1.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef()); + leaderNode1.kit().expectMsgClass(Success.class); + leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets"); + + MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.waitForMembersUp("member-2"); + newReplicaNode2.waitForMembersUp("member-1"); + + newReplicaNode2.configDataStore().getActorContext().getShardManager().tell( + new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef()); + newReplicaNode2.kit().expectMsgClass(Success.class); + + newReplicaNode2.operDataStore().getActorContext().getShardManager().tell( + new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module", + "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null), + newReplicaNode2.kit().getRef()); + newReplicaNode2.kit().expectMsgClass(Success.class); + + ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), + newReplicaNode2.operDataStore()); + + RpcResult rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS); + AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("pets", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational), + failedShardResult("no-leader", DataStoreType.Operational)); + + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1"); + + service.close(); } @Test @@ -302,4 +362,35 @@ public class ClusterAdminRpcServiceTest { public void testConvertMembersToNonvotingForAllShards() { // TODO implement } + + private void verifyShardResults(List shardResults, ShardResult... expShardResults) { + Map expResultsMap = new HashMap<>(); + for(ShardResult r: expShardResults) { + expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r); + } + + for(ShardResult result: shardResults) { + ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType()); + assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(), + result.getDataStoreType()), exp); + assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded()); + if(exp.isSucceeded()) { + assertNull("Expected null error message", result.getErrorMessage()); + } else { + assertNotNull("Expected error message", result.getErrorMessage()); + } + } + + if(!expResultsMap.isEmpty()) { + fail("Missing shard results for " + expResultsMap.keySet()); + } + } + + private ShardResult successShardResult(String shardName, DataStoreType type) { + return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(true).build(); + } + + private ShardResult failedShardResult(String shardName, DataStoreType type) { + return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(false).build(); + } }