2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.admin;
10 import static org.hamcrest.CoreMatchers.anyOf;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertThat;
16 import static org.junit.Assert.fail;
17 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
18 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
19 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Status.Success;
23 import akka.cluster.Cluster;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Iterables;
27 import com.google.common.collect.Sets;
29 import java.io.FileInputStream;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
38 import java.util.concurrent.TimeUnit;
39 import org.apache.commons.lang3.SerializationUtils;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
45 import org.opendaylight.controller.cluster.datastore.MemberNode;
46 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
47 import org.opendaylight.controller.cluster.datastore.Shard;
48 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
50 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
51 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
52 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
53 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
54 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
64 import org.opendaylight.yangtools.yang.common.RpcError;
65 import org.opendaylight.yangtools.yang.common.RpcResult;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
69 * Unit tests for ClusterAdminRpcService.
71 * @author Thomas Pantelis
73 public class ClusterAdminRpcServiceTest {
74 private final List<MemberNode> memberNodes = new ArrayList<>();
78 InMemoryJournal.clear();
79 InMemorySnapshotStore.clear();
83 public void tearDown() {
84 for(MemberNode m: memberNodes) {
90 public void testBackupDatastore() throws Exception {
91 MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1").
92 moduleShardsConfig("module-shards-member1.conf").
93 waitForShardLeader("cars", "people").testName("testBackupDatastore").build();
95 String fileName = "target/testBackupDatastore";
96 new File(fileName).delete();
98 ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
100 RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder().
101 setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
102 verifySuccessfulRpcResult(rpcResult);
104 try(FileInputStream fis = new FileInputStream(fileName)) {
105 List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
106 assertEquals("DatastoreSnapshot size", 2, snapshots.size());
108 ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
109 snapshots.get(1).getType(), snapshots.get(1));
110 verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreType(),
111 map.get(node.configDataStore().getActorContext().getDataStoreType()), "cars", "people");
113 new File(fileName).delete();
116 // Test failure by killing a shard.
118 node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder().
119 shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
121 ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
122 node.kit().watch(carsShardActor);
123 carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
124 node.kit().expectTerminated(carsShardActor);
126 rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
127 get(5, TimeUnit.SECONDS);
128 assertEquals("isSuccessful", false, rpcResult.isSuccessful());
129 assertEquals("getErrors", 1, rpcResult.getErrors().size());
134 private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
135 assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
136 Set<String> shardNames = new HashSet<>();
137 for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
138 shardNames.add(s.getName());
141 assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
145 public void testAddShardReplica() throws Exception {
146 String name = "testAddShardReplica";
147 String moduleShardsConfig = "module-shards-cars-member-1.conf";
148 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
149 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
151 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
152 moduleShardsConfig(moduleShardsConfig).build();
154 leaderNode1.waitForMembersUp("member-2");
156 doAddShardReplica(newReplicaNode2, "cars", "member-1");
158 MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
159 moduleShardsConfig(moduleShardsConfig).build();
161 leaderNode1.waitForMembersUp("member-3");
162 newReplicaNode2.waitForMembersUp("member-3");
164 doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
166 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
167 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
169 // Write data to member-2's config datastore and read/verify via member-3
170 NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
171 newReplicaNode3.configDataStore());
173 // Write data to member-3's oper datastore and read/verify via member-2
174 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
176 // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
177 // 2 ServerConfigurationPayload entries and the transaction payload entry.
179 RaftStateVerifier verifier = new RaftStateVerifier() {
181 public void verify(OnDemandRaftState raftState) {
182 assertEquals("Commit index", 2, raftState.getCommitIndex());
183 assertEquals("Last applied index", 2, raftState.getLastApplied());
187 verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
188 verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
190 verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
191 verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
193 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
194 verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
196 // Restart member-3 and verify the cars config shard is re-instated.
198 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
199 newReplicaNode3.cleanup();
201 newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
202 moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
204 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
205 readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
209 public void testAddShardReplicaFailures() throws Exception {
210 String name = "testAddShardReplicaFailures";
211 MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name).
212 moduleShardsConfig("module-shards-cars-member-1.conf").build();
214 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
215 memberNode.operDataStore());
217 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().
218 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
219 verifyFailedRpcResult(rpcResult);
221 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars").
222 build()).get(10, TimeUnit.SECONDS);
223 verifyFailedRpcResult(rpcResult);
225 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
226 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
227 verifyFailedRpcResult(rpcResult);
232 private NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
233 DistributedDataStore readFromStore) throws Exception {
234 DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
235 NormalizedNode<?, ?> carsNode = CarsModel.create();
236 writeTx.write(CarsModel.BASE_PATH, carsNode);
238 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
239 Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS);
240 assertEquals("canCommit", true, canCommit);
241 cohort.preCommit().get(5, TimeUnit.SECONDS);
242 cohort.commit().get(5, TimeUnit.SECONDS);
244 readCarsNodeAndVerify(readFromStore, carsNode);
248 private void readCarsNodeAndVerify(DistributedDataStore readFromStore,
249 NormalizedNode<?, ?> expCarsNode) throws Exception {
250 Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().
251 read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
252 assertEquals("isPresent", true, optional.isPresent());
253 assertEquals("Data node", expCarsNode, optional.get());
256 private void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
258 memberNode.waitForMembersUp(peerMemberNames);
260 ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
261 memberNode.operDataStore());
263 RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
264 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
265 verifySuccessfulRpcResult(rpcResult);
267 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
269 Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
270 assertEquals("Oper shard present", false, optional.isPresent());
272 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
273 setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
274 verifySuccessfulRpcResult(rpcResult);
276 verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
281 private <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
282 if(!rpcResult.isSuccessful()) {
283 if(rpcResult.getErrors().size() > 0) {
284 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
285 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
288 fail("Rpc failed with no error");
291 return rpcResult.getResult();
294 private void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
295 assertEquals("RpcResult", false, rpcResult.isSuccessful());
296 assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
297 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
298 assertNotNull("RpcResult error message null", error.getMessage());
302 public void testRemoveShardReplica() throws Exception {
303 String name = "testRemoveShardReplicaLocal";
304 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
305 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
306 moduleShardsConfig(moduleShardsConfig).
307 datastoreContextBuilder(DatastoreContext.newBuilder().
308 shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
310 MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
311 moduleShardsConfig(moduleShardsConfig).build();
313 MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
314 moduleShardsConfig(moduleShardsConfig).build();
316 leaderNode1.configDataStore().waitTillReady();
317 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
318 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
319 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
321 // Invoke RPC service on member-3 to remove it's local shard
323 ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
324 replicaNode3.operDataStore());
326 RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder().
327 setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
328 get(10, TimeUnit.SECONDS);
329 verifySuccessfulRpcResult(rpcResult);
332 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
333 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
334 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
336 // Restart member-2 and verify member-3 isn't present.
338 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
339 replicaNode2.cleanup();
341 replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
342 moduleShardsConfig(moduleShardsConfig).build();
344 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
346 // Invoke RPC service on member-1 to remove member-2
348 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
349 leaderNode1.operDataStore());
351 rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
352 setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
353 get(10, TimeUnit.SECONDS);
354 verifySuccessfulRpcResult(rpcResult);
357 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
358 verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
362 public void testRemoveShardLeaderReplica() throws Exception {
363 String name = "testRemoveShardLeaderReplica";
364 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
365 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
366 moduleShardsConfig(moduleShardsConfig).
367 datastoreContextBuilder(DatastoreContext.newBuilder().
368 shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
370 MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
371 moduleShardsConfig(moduleShardsConfig).build();
373 MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
374 moduleShardsConfig(moduleShardsConfig).build();
376 leaderNode1.configDataStore().waitTillReady();
377 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
378 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
379 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
381 replicaNode2.waitForMembersUp("member-1", "member-3");
382 replicaNode2.waitForMembersUp("member-1", "member-2");
384 // Invoke RPC service on leader member-1 to remove it's local shard
386 ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
387 leaderNode1.operDataStore());
389 RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
390 setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
391 get(10, TimeUnit.SECONDS);
392 verifySuccessfulRpcResult(rpcResult);
395 verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
397 public void verify(OnDemandRaftState raftState) {
398 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
399 containsString("member-3")));
403 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
404 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
405 verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
409 public void testAddReplicasForAllShards() throws Exception {
410 String name = "testAddReplicasForAllShards";
411 String moduleShardsConfig = "module-shards-member1.conf";
412 MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
413 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
415 ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
416 "pets", null, Arrays.asList("member-1"));
417 leaderNode1.configDataStore().getActorContext().getShardManager().tell(
418 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
419 leaderNode1.kit().expectMsgClass(Success.class);
420 leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
422 MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
423 moduleShardsConfig(moduleShardsConfig).build();
425 leaderNode1.waitForMembersUp("member-2");
426 newReplicaNode2.waitForMembersUp("member-1");
428 newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
429 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
430 newReplicaNode2.kit().expectMsgClass(Success.class);
432 newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
433 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
434 "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null),
435 newReplicaNode2.kit().getRef());
436 newReplicaNode2.kit().expectMsgClass(Success.class);
438 ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
439 newReplicaNode2.operDataStore());
441 RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
442 AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
443 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
444 successShardResult("people", DataStoreType.Config),
445 successShardResult("pets", DataStoreType.Config),
446 successShardResult("cars", DataStoreType.Operational),
447 successShardResult("people", DataStoreType.Operational),
448 failedShardResult("no-leader", DataStoreType.Operational));
450 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
451 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
452 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
453 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
454 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
460 public void testRemoveAllShardReplicas() {
465 public void testConvertMembersToVotingForAllShards() {
470 public void testConvertMembersToNonvotingForAllShards() {
474 private void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
475 Map<String, ShardResult> expResultsMap = new HashMap<>();
476 for(ShardResult r: expShardResults) {
477 expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
480 for(ShardResult result: shardResults) {
481 ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
482 assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
483 result.getDataStoreType()), exp);
484 assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
485 if(exp.isSucceeded()) {
486 assertNull("Expected null error message", result.getErrorMessage());
488 assertNotNull("Expected error message", result.getErrorMessage());
492 if(!expResultsMap.isEmpty()) {
493 fail("Missing shard results for " + expResultsMap.keySet());
497 private ShardResult successShardResult(String shardName, DataStoreType type) {
498 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(true).build();
501 private ShardResult failedShardResult(String shardName, DataStoreType type) {
502 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(false).build();