Bug 2187: Implement add-replicas-for-all-shards RPC 22/31022/4
authorTom Pantelis <tpanteli@brocade.com>
Tue, 8 Dec 2015 13:38:47 +0000 (08:38 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 14 Dec 2015 22:21:01 +0000 (22:21 +0000)
Implemented the add-replicas-for-all-shards RPC. The yang RPC definition
was changed to return a list of results for each shard.

In the unit test I added a shard config dynamically to the new replica
member that doesn't exist in the leader to test a replica failure case.
This revealed an issue for FindPrimary where both member nodes forwarded the
RemoteFindPrimary message back and forth to each other as neither had
a local replica. To fix this, I added a visitedAddresses set to the
RemoteFindPrimary message to prevent the endless mutual recursion.

Change-Id: Icb0329db2c935f9825b81f593b83bdab13fa6b52
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPeerAddressResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java

index d61e12e..c6d7d82 100644 (file)
@@ -797,12 +797,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
+        Collection<String> visitedAddresses;
+        if(message instanceof RemoteFindPrimary) {
+            visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
+        } else {
+            visitedAddresses = new ArrayList<>();
+        }
+
+        visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
+
         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+            if(visitedAddresses.contains(address)) {
+                continue;
+            }
+
             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
                     shardName, address);
 
             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
-                    message.isWaitUntilReady()), getContext());
+                    message.isWaitUntilReady(), visitedAddresses), getContext());
             return;
         }
 
index 464fc7f..40596f5 100644 (file)
@@ -74,7 +74,7 @@ class ShardPeerAddressResolver implements PeerAddressResolver {
         return null;
     }
 
-    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+    StringBuilder getShardManagerActorPathBuilder(Address address) {
         return new StringBuilder().append(address.toString()).append("/user/").append(shardManagerIdentifier);
     }
 
index 007b347..976443f 100644 (file)
@@ -12,13 +12,19 @@ import akka.actor.Status.Success;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Function;
 import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
@@ -26,9 +32,12 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
@@ -36,6 +45,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -48,6 +59,8 @@ import org.slf4j.LoggerFactory;
  * @author Thomas Pantelis
  */
 public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
+    private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
+
     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
 
     private final DistributedDataStore configDataStore;
@@ -113,12 +126,29 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     }
 
     @Override
-    public Future<RpcResult<Void>> addReplicasForAllShards() {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards() {
+        LOG.info("Adding replicas for all shards");
+
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = new Function<String, Object>() {
+            @Override
+            public Object apply(String shardName) {
+                return new AddShardReplica(shardName);
+            }
+        };
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, new Function<List<ShardResult>, AddReplicasForAllShardsOutput>() {
+            @Override
+            public AddReplicasForAllShardsOutput apply(List<ShardResult> shardResults) {
+                return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build();
+            }
+        }, "Failed to add replica");
     }
 
+
     @Override
     public Future<RpcResult<Void>> removeAllShardReplicas() {
         // TODO implement
@@ -166,9 +196,68 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
+    private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
+            final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
+            final Function<List<ShardResult>, T> resultDataSupplier,
+            final String failureLogMsgPrefix) {
+        final SettableFuture<RpcResult<T>> returnFuture = SettableFuture.create();
+        final List<ShardResult> shardResults = new ArrayList<>();
+        for(final Entry<ListenableFuture<Success>, ShardResultBuilder> entry: shardResultData) {
+            Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
+                @Override
+                public void onSuccess(Success result) {
+                    synchronized(shardResults) {
+                        ShardResultBuilder shardResult = entry.getValue();
+                        LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(),
+                                shardResult.getDataStoreType());
+                        shardResults.add(shardResult.setSucceeded(true).build());
+                        checkIfComplete();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    synchronized(shardResults) {
+                        ShardResultBuilder shardResult = entry.getValue();
+                        LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(),
+                                shardResult.getDataStoreType(), t);
+                        shardResults.add(shardResult.setSucceeded(false).setErrorMessage(
+                                Throwables.getRootCause(t).getMessage()).build());
+                        checkIfComplete();
+                    }
+                }
+
+                void checkIfComplete() {
+                    LOG.debug("checkIfComplete: expected {}, actual {}", shardResultData.size(), shardResults.size());
+                    if(shardResults.size() == shardResultData.size()) {
+                        returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults)));
+                    }
+                }
+            });
+        }
+        return returnFuture;
+    }
+
+    private <T> void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType,
+            List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
+            Function<String, Object> messageSupplier) {
+        ActorContext actorContext = dataStoreType == DataStoreType.Config ?
+                configDataStore.getActorContext() : operDataStore.getActorContext();
+        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+
+        LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreType());
+
+        for(String shardName: allShardNames) {
+            ListenableFuture<T> future = this.<T>ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
+                    SHARD_MGR_TIMEOUT);
+            shardResultData.add(new SimpleEntry<ListenableFuture<T>, ShardResultBuilder>(future,
+                    new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private <T> ListenableFuture<List<T>> sendMessageToShardManagers(Object message) {
-        Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+        Timeout timeout = SHARD_MGR_TIMEOUT;
         ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
         ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
 
@@ -178,7 +267,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     private <T> ListenableFuture<T> sendMessageToShardManager(DataStoreType dataStoreType, Object message) {
         ActorRef shardManager = dataStoreType == DataStoreType.Config ?
                 configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager();
-        return ask(shardManager, message, new Timeout(1, TimeUnit.MINUTES));
+        return ask(shardManager, message, SHARD_MGR_TIMEOUT);
     }
 
     private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
@@ -232,6 +321,10 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     }
 
     private static RpcResult<Void> newSuccessfulResult() {
-        return RpcResultBuilder.<Void>success().build();
+        return newSuccessfulResult((Void)null);
+    }
+
+    private static <T> RpcResult<T> newSuccessfulResult(T data) {
+        return RpcResultBuilder.<T>success(data).build();
     }
 }
index 820512e..041085f 100644 (file)
@@ -7,6 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nonnull;
+
 /**
  * A remote message sent to locate the primary shard.
  *
@@ -15,7 +21,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 public class RemoteFindPrimary extends FindPrimary {
     private static final long serialVersionUID = 1L;
 
-    public RemoteFindPrimary(String shardName, boolean waitUntilReady) {
+    private final Set<String> visitedAddresses;
+
+    public RemoteFindPrimary(String shardName, boolean waitUntilReady, @Nonnull Collection<String> visitedAddresses) {
         super(shardName, waitUntilReady);
+        this.visitedAddresses = new HashSet<>(Preconditions.checkNotNull(visitedAddresses));
+    }
+
+    @Nonnull
+    public Set<String> getVisitedAddresses() {
+        return visitedAddresses;
     }
 }
index 940741b..a532895 100644 (file)
@@ -21,13 +21,31 @@ module cluster-admin {
         }
     }
 
+    grouping shard-operation-result {
+        leaf shard-name {
+            type string;
+        }
+
+        leaf data-store-type {
+            type data-store-type;
+        }
+
+        leaf succeeded {
+            type boolean;
+        }
+
+        leaf error-message {
+            type string;
+        }
+    }
+
     rpc add-shard-replica {
         input {
             leaf shard-name {
                 type string;
                 description "The name of the shard for which to create a replica.";
             }
-            
+
             leaf data-store-type {
                 type data-store-type;
                 description "The type of the data store to which the replica belongs";
@@ -64,6 +82,16 @@ module cluster-admin {
     }
 
     rpc add-replicas-for-all-shards {
+        output {
+            list shard-result {
+                key "shard-name";
+                key "data-store-type";
+                uses shard-operation-result;
+
+                description "The list of results, one per shard";
+            }
+        }
+
         description "Adds replicas on this node for all currently defined shards. This is equivalent to issuing
             an add-shard-replica RPC for all shards.";
     }
index 33be6be..e966c95 100644 (file)
@@ -144,7 +144,8 @@ public class MemberNode {
         verifyRaftState(datastore, shardName, new RaftStateVerifier() {
             @Override
             public void verify(OnDemandRaftState raftState) {
-                assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName,
+                assertTrue(String.format("Peer(s) %s not found for shard %s. Actual: %s", peerIds, shardName,
+                        raftState.getPeerAddresses().keySet()),
                         raftState.getPeerAddresses().keySet().containsAll(peerIds));
             }
         });
index 765d8a1..e117dcc 100644 (file)
@@ -9,11 +9,13 @@ package org.opendaylight.controller.cluster.datastore.admin;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
+import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
@@ -21,9 +23,13 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileInputStream;
+import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
@@ -33,6 +39,9 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.MemberNode;
 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -40,9 +49,12 @@ import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -260,7 +272,7 @@ public class ClusterAdminRpcServiceTest {
         service.close();
     }
 
-    private void verifySuccessfulRpcResult(RpcResult<Void> rpcResult) {
+    private <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
         if(!rpcResult.isSuccessful()) {
             if(rpcResult.getErrors().size() > 0) {
                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
@@ -269,6 +281,8 @@ public class ClusterAdminRpcServiceTest {
 
             fail("Rpc failed with no error");
         }
+
+        return rpcResult.getResult();
     }
 
     private void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
@@ -284,8 +298,54 @@ public class ClusterAdminRpcServiceTest {
     }
 
     @Test
-    public void testAddReplicasForAllShards() {
-        // TODO implement
+    public void testAddReplicasForAllShards() throws Exception {
+        String name = "testAddReplicasForAllShards";
+        String moduleShardsConfig = "module-shards-member1.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
+
+        ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
+                "pets", null, Arrays.asList("member-1"));
+        leaderNode1.configDataStore().getActorContext().getShardManager().tell(
+                new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
+        leaderNode1.kit().expectMsgClass(Success.class);
+        leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
+
+        MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.waitForMembersUp("member-2");
+        newReplicaNode2.waitForMembersUp("member-1");
+
+        newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
+                new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
+        newReplicaNode2.kit().expectMsgClass(Success.class);
+
+        newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
+                new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
+                        "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null),
+                                newReplicaNode2.kit().getRef());
+        newReplicaNode2.kit().expectMsgClass(Success.class);
+
+        ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
+                newReplicaNode2.operDataStore());
+
+        RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
+        AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+        verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+                successShardResult("people", DataStoreType.Config),
+                successShardResult("pets", DataStoreType.Config),
+                successShardResult("cars", DataStoreType.Operational),
+                successShardResult("people", DataStoreType.Operational),
+                failedShardResult("no-leader", DataStoreType.Operational));
+
+        verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
+        verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
+        verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
+        verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
+        verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
+
+        service.close();
     }
 
     @Test
@@ -302,4 +362,35 @@ public class ClusterAdminRpcServiceTest {
     public void testConvertMembersToNonvotingForAllShards() {
         // TODO implement
     }
+
+    private void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
+        Map<String, ShardResult> expResultsMap = new HashMap<>();
+        for(ShardResult r: expShardResults) {
+            expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
+        }
+
+        for(ShardResult result: shardResults) {
+            ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
+            assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
+                    result.getDataStoreType()), exp);
+            assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
+            if(exp.isSucceeded()) {
+                assertNull("Expected null error message", result.getErrorMessage());
+            } else {
+                assertNotNull("Expected error message", result.getErrorMessage());
+            }
+        }
+
+        if(!expResultsMap.isEmpty()) {
+            fail("Missing shard results for " + expResultsMap.keySet());
+        }
+    }
+
+    private ShardResult successShardResult(String shardName, DataStoreType type) {
+        return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(true).build();
+    }
+
+    private ShardResult failedShardResult(String shardName, DataStoreType type) {
+        return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(false).build();
+    }
 }