Migrate AbstractProxyTransactionTest to MockitoJUnitRunner
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.anyString;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.timeout;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyNoMoreInteractions;
24
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSystem;
27 import akka.actor.AddressFromURIString;
28 import akka.actor.Props;
29 import akka.actor.Status;
30 import akka.actor.Status.Failure;
31 import akka.actor.Status.Success;
32 import akka.cluster.Cluster;
33 import akka.cluster.ClusterEvent;
34 import akka.cluster.Member;
35 import akka.dispatch.Dispatchers;
36 import akka.dispatch.OnComplete;
37 import akka.japi.Creator;
38 import akka.pattern.Patterns;
39 import akka.persistence.RecoveryCompleted;
40 import akka.serialization.Serialization;
41 import akka.testkit.TestActorRef;
42 import akka.testkit.javadsl.TestKit;
43 import akka.util.Timeout;
44 import com.google.common.base.Stopwatch;
45 import com.google.common.collect.ImmutableMap;
46 import com.google.common.collect.Lists;
47 import com.google.common.collect.Sets;
48 import com.google.common.util.concurrent.Uninterruptibles;
49 import java.time.Duration;
50 import java.util.AbstractMap;
51 import java.util.Arrays;
52 import java.util.Collection;
53 import java.util.Collections;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Map.Entry;
58 import java.util.Set;
59 import java.util.concurrent.CountDownLatch;
60 import java.util.concurrent.TimeUnit;
61 import java.util.concurrent.TimeoutException;
62 import java.util.function.Consumer;
63 import java.util.function.Function;
64 import java.util.stream.Collectors;
65 import org.junit.AfterClass;
66 import org.junit.BeforeClass;
67 import org.junit.Test;
68 import org.opendaylight.controller.cluster.access.concepts.MemberName;
69 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
70 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
71 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
72 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
73 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
74 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
75 import org.opendaylight.controller.cluster.datastore.Shard;
76 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
77 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
78 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
79 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
80 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
81 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
82 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
83 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
84 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
85 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
86 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
87 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
88 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
89 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
90 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
91 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
93 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
94 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
95 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
96 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
97 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
98 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
99 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
100 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
101 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
102 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
103 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
104 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
105 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
106 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
107 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
108 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
109 import org.opendaylight.controller.cluster.raft.RaftState;
110 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
111 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
112 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
113 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
114 import org.opendaylight.controller.cluster.raft.messages.AddServer;
115 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
116 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
117 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
118 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
119 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
120 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
121 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
122 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
123 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
124 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
125 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
126 import org.opendaylight.yangtools.concepts.Registration;
127 import org.opendaylight.yangtools.yang.common.XMLNamespace;
128 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
129 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
130 import org.slf4j.Logger;
131 import org.slf4j.LoggerFactory;
132 import scala.concurrent.Await;
133 import scala.concurrent.Future;
134 import scala.concurrent.duration.FiniteDuration;
135
136 public class ShardManagerTest extends AbstractShardManagerTest {
137     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
138     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
139     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
140
141     private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
142
143     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
144
145     @BeforeClass
146     public static void beforeClass() {
147         TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
148     }
149
150     @AfterClass
151     public static void afterClass() {
152         TEST_SCHEMA_CONTEXT = null;
153     }
154
155     private ActorSystem newActorSystem(final String config) {
156         return newActorSystem("cluster-test", config);
157     }
158
159     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
160         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
161         if (system == getSystem()) {
162             return actorFactory.createActor(MessageCollectorActor.props(), name);
163         }
164
165         return system.actorOf(MessageCollectorActor.props(), name);
166     }
167
168     private Props newShardMgrProps() {
169         return newShardMgrProps(new MockConfiguration());
170     }
171
172     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
173         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
174         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
175         doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
176         return mockFactory;
177     }
178
179     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
180         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
181     }
182
183     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
184         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
185                 .distributedDataStore(mock(DistributedDataStore.class));
186     }
187
188
189     private Props newPropsShardMgrWithMockShardActor() {
190         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
191                 Dispatchers.DefaultDispatcherId());
192     }
193
194     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
195         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
196                 .withDispatcher(Dispatchers.DefaultDispatcherId());
197     }
198
199
200     private TestShardManager newTestShardManager() {
201         return newTestShardManager(newShardMgrProps());
202     }
203
204     private TestShardManager newTestShardManager(final Props props) {
205         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
206         TestShardManager shardManager = shardManagerActor.underlyingActor();
207         shardManager.waitForRecoveryComplete();
208         return shardManager;
209     }
210
211     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
212             final TestKit kit) {
213         AssertionError last = null;
214         Stopwatch sw = Stopwatch.createStarted();
215         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
216             try {
217                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
218                 kit.expectMsgClass(LocalShardFound.class);
219                 return;
220             } catch (AssertionError e) {
221                 last = e;
222             }
223
224             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
225         }
226
227         throw last;
228     }
229
230     @SuppressWarnings("unchecked")
231     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
232         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
233         if (reply instanceof Failure) {
234             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
235         }
236
237         return (T)reply;
238     }
239
240     @Test
241     public void testPerShardDatastoreContext() throws Exception {
242         LOG.info("testPerShardDatastoreContext starting");
243         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
244                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
245
246         doReturn(
247                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
248                 .when(mockFactory).getShardDatastoreContext("default");
249
250         doReturn(
251                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
252                 .when(mockFactory).getShardDatastoreContext("topology");
253
254         final MockConfiguration mockConfig = new MockConfiguration() {
255             @Override
256             public Collection<String> getMemberShardNames(final MemberName memberName) {
257                 return Arrays.asList("default", "topology");
258             }
259
260             @Override
261             public Collection<MemberName> getMembersFromShardName(final String shardName) {
262                 return members("member-1");
263             }
264         };
265
266         final ActorRef defaultShardActor = actorFactory.createActor(
267                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
268         final ActorRef topologyShardActor = actorFactory.createActor(
269                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
270
271         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
272                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
273         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
274         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
275
276         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
277         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
278         class LocalShardManager extends ShardManager {
279             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
280                 super(creator);
281             }
282
283             @Override
284             protected ActorRef newShardActor(final ShardInformation info) {
285                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
286                 ActorRef ref = null;
287                 if (entry != null) {
288                     ref = entry.getKey();
289                     entry.setValue(info.getDatastoreContext());
290                 }
291
292                 newShardActorLatch.countDown();
293                 return ref;
294             }
295         }
296
297         final Creator<ShardManager> creator = new Creator<>() {
298             private static final long serialVersionUID = 1L;
299             @Override
300             public ShardManager create() {
301                 return new LocalShardManager(
302                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
303                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
304             }
305         };
306
307         final TestKit kit = new TestKit(getSystem());
308
309         final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
310                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
311
312         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
313
314         assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
315         assertEquals("getShardElectionTimeoutFactor", 6,
316                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
317         assertEquals("getShardElectionTimeoutFactor", 7,
318                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
319
320         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
321                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
322         doReturn(
323                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
324                 .when(newMockFactory).getShardDatastoreContext("default");
325
326         doReturn(
327                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
328                 .when(newMockFactory).getShardDatastoreContext("topology");
329
330         shardManager.tell(newMockFactory, kit.getRef());
331
332         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
333                 DatastoreContext.class);
334         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
335
336         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
337         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
338
339         LOG.info("testPerShardDatastoreContext ending");
340     }
341
342     @Test
343     public void testOnReceiveFindPrimaryForNonExistentShard() {
344         final TestKit kit = new TestKit(getSystem());
345         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
346
347         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
348
349         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
350
351         kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
352     }
353
354     @Test
355     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
356         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
357         final TestKit kit = new TestKit(getSystem());
358         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
359
360         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
361
362         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
363         shardManager.tell(new ActorInitialized(), mockShardActor);
364
365         DataTree mockDataTree = mock(DataTree.class);
366         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
367             DataStoreVersions.CURRENT_VERSION), kit.getRef());
368
369         MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
370         shardManager.tell(
371             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
372             mockShardActor);
373
374         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
375
376         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
377             LocalPrimaryShardFound.class);
378         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
379             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
380         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
381
382         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
383     }
384
385     @Test
386     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
387         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
388         final TestKit kit = new TestKit(getSystem());
389         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
390
391         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
392         shardManager.tell(new ActorInitialized(), mockShardActor);
393
394         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
395         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
396         shardManager.tell(
397             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
398             mockShardActor);
399         shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
400             mockShardActor);
401
402         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
403
404         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
405
406         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
407     }
408
409     @Test
410     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
411         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
412         final TestKit kit = new TestKit(getSystem());
413         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
414
415         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
416         shardManager.tell(new ActorInitialized(), mockShardActor);
417
418         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
419         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
420
421         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
422         shardManager.tell(
423             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
424             mockShardActor);
425         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
426         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
427
428         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
429
430         RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
431         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
432             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
433         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
434
435         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
436     }
437
438     @Test
439     public void testOnReceiveFindPrimaryForUninitializedShard() {
440         final TestKit kit = new TestKit(getSystem());
441         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
442
443         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
444
445         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
446     }
447
448     @Test
449     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
450         final TestKit kit = new TestKit(getSystem());
451         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
452
453         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
454         shardManager.tell(new ActorInitialized(), mockShardActor);
455
456         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
457
458         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
459     }
460
461     @Test
462     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
463         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
464         final TestKit kit = new TestKit(getSystem());
465         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
466
467         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
468         shardManager.tell(new ActorInitialized(), mockShardActor);
469
470         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
471         shardManager.tell(
472             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
473             mockShardActor);
474
475         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
476
477         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
478
479         DataTree mockDataTree = mock(DataTree.class);
480         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
481             DataStoreVersions.CURRENT_VERSION), mockShardActor);
482
483         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
484
485         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
486             LocalPrimaryShardFound.class);
487         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
488             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
489         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
490
491         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
492     }
493
494     @Test
495     public void testOnReceiveFindPrimaryWaitForShardLeader() {
496         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
497         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
498         final TestKit kit = new TestKit(getSystem());
499         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
500
501         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
502
503         // We're passing waitUntilInitialized = true to FindPrimary so
504         // the response should be
505         // delayed until we send ActorInitialized and
506         // RoleChangeNotification.
507         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
508
509         kit.expectNoMessage(Duration.ofMillis(150));
510
511         shardManager.tell(new ActorInitialized(), mockShardActor);
512
513         kit.expectNoMessage(Duration.ofMillis(150));
514
515         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
516         shardManager.tell(
517             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
518             mockShardActor);
519
520         kit.expectNoMessage(Duration.ofMillis(150));
521
522         DataTree mockDataTree = mock(DataTree.class);
523         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
524             DataStoreVersions.CURRENT_VERSION), mockShardActor);
525
526         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
527         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
528             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
529         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
530
531         kit.expectNoMessage(Duration.ofMillis(200));
532
533         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
534     }
535
536     @Test
537     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
538         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
539         final TestKit kit = new TestKit(getSystem());
540         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
541
542         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
543
544         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
545
546         kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
547
548         shardManager.tell(new ActorInitialized(), mockShardActor);
549
550         kit.expectNoMessage(Duration.ofMillis(200));
551
552         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
553     }
554
555     @Test
556     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
557         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
558         final TestKit kit = new TestKit(getSystem());
559         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
560
561         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
562         shardManager.tell(new ActorInitialized(), mockShardActor);
563         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
564             RaftState.Candidate.name()), mockShardActor);
565
566         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
567
568         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
569
570         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
571     }
572
573     @Test
574     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
575         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
576         final TestKit kit = new TestKit(getSystem());
577         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
578
579         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
580         shardManager.tell(new ActorInitialized(), mockShardActor);
581         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
582             RaftState.IsolatedLeader.name()), mockShardActor);
583
584         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
585
586         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
587
588         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
589     }
590
591     @Test
592     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
593         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
594         final TestKit kit = new TestKit(getSystem());
595         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
596
597         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
598         shardManager.tell(new ActorInitialized(), mockShardActor);
599
600         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
601
602         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
603
604         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
605     }
606
607     @Test
608     public void testOnReceiveFindPrimaryForRemoteShard() {
609         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
610         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
611
612         // Create an ActorSystem ShardManager actor for member-1.
613
614         final ActorSystem system1 = newActorSystem("Member1");
615         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
616
617         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
618                 newTestShardMgrBuilderWithMockShardActor().cluster(
619                         new ClusterWrapperImpl(system1)).props().withDispatcher(
620                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
621
622         // Create an ActorSystem ShardManager actor for member-2.
623
624         final ActorSystem system2 = newActorSystem("Member2");
625
626         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
627
628         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
629
630         MockConfiguration mockConfig2 = new MockConfiguration(
631                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
632                         .put("astronauts", Arrays.asList("member-2")).build());
633
634         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
635                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
636                         new ClusterWrapperImpl(system2)).props().withDispatcher(
637                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
638
639         final TestKit kit = new TestKit(system1);
640         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
641         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
642
643         shardManager2.tell(new ActorInitialized(), mockShardActor2);
644
645         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
646         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
647         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
648             mockShardActor2);
649         shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
650             mockShardActor2);
651
652         shardManager1.underlyingActor().waitForMemberUp();
653         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
654
655         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
656         String path = found.getPrimaryPath();
657         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
658         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
659
660         shardManager2.underlyingActor().verifyFindPrimary();
661
662         // This part times out quite a bit on jenkins for some reason
663
664 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
665 //
666 //                shardManager1.underlyingActor().waitForMemberRemoved();
667 //
668 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
669 //
670 //                expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
671
672         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
673     }
674
675     @Test
676     public void testShardAvailabilityOnChangeOfMemberReachability() {
677         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
678         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
679
680         // Create an ActorSystem ShardManager actor for member-1.
681
682         final ActorSystem system1 = newActorSystem("Member1");
683         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
684
685         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
686
687         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
688                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
689                         new ClusterWrapperImpl(system1)).props().withDispatcher(
690                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
691
692         // Create an ActorSystem ShardManager actor for member-2.
693
694         final ActorSystem system2 = newActorSystem("Member2");
695
696         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
697
698         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
699
700         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
701                 .put("default", Arrays.asList("member-1", "member-2")).build());
702
703         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
704                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
705                         new ClusterWrapperImpl(system2)).props().withDispatcher(
706                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
707
708         final TestKit kit = new TestKit(system1);
709         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
710         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
711         shardManager1.tell(new ActorInitialized(), mockShardActor1);
712         shardManager2.tell(new ActorInitialized(), mockShardActor2);
713
714         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
715         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
716         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
717             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
718         shardManager1.tell(
719             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
720             mockShardActor1);
721         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
722             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
723         shardManager2.tell(
724             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
725             mockShardActor2);
726         shardManager1.underlyingActor().waitForMemberUp();
727
728         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
729
730         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
731         String path = found.getPrimaryPath();
732         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
733
734         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
735             kit.getRef());
736
737         shardManager1.underlyingActor().waitForUnreachableMember();
738         MessageCollectorActor.clearMessages(mockShardActor1);
739
740         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
741             kit.getRef());
742
743         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
744
745         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
746
747         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
748             kit.getRef());
749
750         shardManager1.underlyingActor().waitForReachableMember();
751
752         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
753
754         RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
755         String path1 = found1.getPrimaryPath();
756         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
757
758         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
759             kit.getRef());
760
761         // Test FindPrimary wait succeeds after reachable member event.
762
763         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
764                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
765         shardManager1.underlyingActor().waitForUnreachableMember();
766
767         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
768
769         shardManager1.tell(
770             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
771
772         RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
773         String path2 = found2.getPrimaryPath();
774         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
775
776         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
777     }
778
779     @Test
780     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
781         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
782         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
783
784         // Create an ActorSystem ShardManager actor for member-1.
785
786         final ActorSystem system1 = newActorSystem("Member1");
787         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
788
789         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
790
791         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
792         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
793                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
794                         .primaryShardInfoCache(primaryShardInfoCache).props()
795                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
796                 shardManagerID);
797
798         // Create an ActorSystem ShardManager actor for member-2.
799
800         final ActorSystem system2 = newActorSystem("Member2");
801
802         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
803
804         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
805
806         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
807                 .put("default", Arrays.asList("member-1", "member-2")).build());
808
809         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
810                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
811                         new ClusterWrapperImpl(system2)).props().withDispatcher(
812                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
813
814         final TestKit kit = new TestKit(system1);
815         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
816         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
817         shardManager1.tell(new ActorInitialized(), mockShardActor1);
818         shardManager2.tell(new ActorInitialized(), mockShardActor2);
819
820         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
821         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
822         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
823             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
824         shardManager1.tell(
825             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
826             mockShardActor1);
827         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
828             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
829         shardManager2.tell(
830             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
831             mockShardActor2);
832         shardManager1.underlyingActor().waitForMemberUp();
833
834         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
835
836         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
837         String path = found.getPrimaryPath();
838         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
839
840         primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
841             system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
842
843         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
844                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
845
846         shardManager1.underlyingActor().waitForUnreachableMember();
847
848         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
849
850         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
851
852         assertNull("Expected primaryShardInfoCache entry removed",
853             primaryShardInfoCache.getIfPresent("default"));
854
855         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
856             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
857         shardManager1.tell(
858             new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
859             mockShardActor1);
860
861         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
862
863         LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
864         String path1 = found1.getPrimaryPath();
865         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
866
867         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
868     }
869
870     @Test
871     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
872         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
873         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
874
875         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
876                 .put("default", Arrays.asList("member-256", "member-2")).build());
877
878         // Create an ActorSystem, ShardManager and actor for member-256.
879
880         final ActorSystem system256 = newActorSystem("Member256");
881         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
882         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
883
884         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
885
886         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
887
888         // ShardManager must be created with shard configuration to let its localShards has shards.
889         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
890                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
891                         .cluster(new ClusterWrapperImpl(system256))
892                         .primaryShardInfoCache(primaryShardInfoCache).props()
893                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
894                 shardManagerID);
895
896         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
897
898         final ActorSystem system2 = newActorSystem("Member2");
899
900         // Join member-2 into the cluster of member-256.
901         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
902
903         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
904
905         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
906                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
907                         new ClusterWrapperImpl(system2)).props().withDispatcher(
908                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
909
910         final TestKit kit256 = new TestKit(system256);
911         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
912         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
913         shardManager256.tell(new ActorInitialized(), mockShardActor256);
914         shardManager2.tell(new ActorInitialized(), mockShardActor2);
915
916         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
917         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
918         shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
919             DataStoreVersions.CURRENT_VERSION), mockShardActor256);
920         shardManager256.tell(
921             new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
922             mockShardActor256);
923         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
924             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
925         shardManager2.tell(
926             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
927             mockShardActor2);
928         shardManager256.underlyingActor().waitForMemberUp();
929
930         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
931
932         LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
933         String path = found.getPrimaryPath();
934         assertTrue("Unexpected primary path " + path + " which must on member-256",
935             path.contains("member-256-shard-default-config"));
936
937         PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
938             system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
939         primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
940
941         // Simulate member-2 become unreachable.
942         shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
943                 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
944         shardManager256.underlyingActor().waitForUnreachableMember();
945
946         // Make sure leader shard on member-256 is still leader and still in the cache.
947         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
948         found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
949         path = found.getPrimaryPath();
950         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
951             path.contains("member-256-shard-default-config"));
952         Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
953         futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
954             @Override
955             public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
956                 if (failure != null) {
957                     assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
958                 } else {
959                     assertEquals("Expected primaryShardInfoCache entry",
960                         primaryShardInfo, futurePrimaryShardInfo);
961                 }
962             }
963         }, system256.dispatchers().defaultGlobalDispatcher());
964
965         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
966     }
967
968     @Test
969     public void testOnReceiveFindLocalShardForNonExistentShard() {
970         final TestKit kit = new TestKit(getSystem());
971         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
972
973         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
974
975         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
976
977         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
978
979         assertEquals("getShardName", "non-existent", notFound.getShardName());
980     }
981
982     @Test
983     public void testOnReceiveFindLocalShardForExistentShard() {
984         final TestKit kit = new TestKit(getSystem());
985         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
986
987         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
988         shardManager.tell(new ActorInitialized(), mockShardActor);
989
990         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
991
992         LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
993
994         assertTrue("Found path contains " + found.getPath().path().toString(),
995             found.getPath().path().toString().contains("member-1-shard-default-config"));
996     }
997
998     @Test
999     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1000         final TestKit kit = new TestKit(getSystem());
1001         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1002
1003         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1004
1005         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1006     }
1007
1008     @Test
1009     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1010         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1011         final TestKit kit = new TestKit(getSystem());
1012         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1013
1014         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1015
1016         // We're passing waitUntilInitialized = true to FindLocalShard
1017         // so the response should be
1018         // delayed until we send ActorInitialized.
1019         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1020             new Timeout(5, TimeUnit.SECONDS));
1021
1022         shardManager.tell(new ActorInitialized(), mockShardActor);
1023
1024         Object resp = Await.result(future, kit.duration("5 seconds"));
1025         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1026
1027         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1028     }
1029
1030     @Test
1031     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1032         TestShardManager shardManager = newTestShardManager();
1033
1034         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1035         shardManager.handleCommand(new RoleChangeNotification(
1036                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1037         assertFalse(ready.isDone());
1038
1039         shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1040                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1041         assertTrue(ready.isDone());
1042     }
1043
1044     @Test
1045     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1046         final TestKit kit = new TestKit(getSystem());
1047         TestShardManager shardManager = newTestShardManager();
1048
1049         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1050         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1051         assertFalse(ready.isDone());
1052
1053         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1054
1055         shardManager.handleCommand(
1056             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1057                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1058         assertTrue(ready.isDone());
1059     }
1060
1061     @Test
1062     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1063         final TestKit kit = new TestKit(getSystem());
1064         TestShardManager shardManager = newTestShardManager();
1065
1066         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1067         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1068         assertFalse(ready.isDone());
1069
1070         shardManager.handleCommand(
1071             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1072                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1073
1074         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1075         assertTrue(ready.isDone());
1076     }
1077
1078     @Test
1079     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1080         TestShardManager shardManager = newTestShardManager();
1081
1082         shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1083             RaftState.Leader.name()));
1084         assertFalse(ready.isDone());
1085     }
1086
1087     @Test
1088     public void testByDefaultSyncStatusIsFalse() {
1089         TestShardManager shardManager = newTestShardManager();
1090
1091         assertFalse(shardManager.getMBean().getSyncStatus());
1092     }
1093
1094     @Test
1095     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1096         TestShardManager shardManager = newTestShardManager();
1097
1098         shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1099                 RaftState.Follower.name(), RaftState.Leader.name()));
1100
1101         assertTrue(shardManager.getMBean().getSyncStatus());
1102     }
1103
1104     @Test
1105     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1106         TestShardManager shardManager = newTestShardManager();
1107
1108         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1109         shardManager.handleCommand(new RoleChangeNotification(shardId,
1110                 RaftState.Follower.name(), RaftState.Candidate.name()));
1111
1112         assertFalse(shardManager.getMBean().getSyncStatus());
1113
1114         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1115         shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1116                 true, shardId));
1117
1118         assertFalse(shardManager.getMBean().getSyncStatus());
1119     }
1120
1121     @Test
1122     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1123         TestShardManager shardManager = newTestShardManager();
1124
1125         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1126         shardManager.handleCommand(new RoleChangeNotification(shardId,
1127                 RaftState.Candidate.name(), RaftState.Follower.name()));
1128
1129         // Initially will be false
1130         assertFalse(shardManager.getMBean().getSyncStatus());
1131
1132         // Send status true will make sync status true
1133         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1134
1135         assertTrue(shardManager.getMBean().getSyncStatus());
1136
1137         // Send status false will make sync status false
1138         shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1139
1140         assertFalse(shardManager.getMBean().getSyncStatus());
1141     }
1142
1143     @Test
1144     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1145         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1146         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1147             @Override
1148             public List<String> getMemberShardNames(final MemberName memberName) {
1149                 return Arrays.asList("default", "astronauts");
1150             }
1151         }));
1152
1153         // Initially will be false
1154         assertFalse(shardManager.getMBean().getSyncStatus());
1155
1156         // Make default shard leader
1157         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1158         shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1159                 RaftState.Follower.name(), RaftState.Leader.name()));
1160
1161         // default = Leader, astronauts is unknown so sync status remains false
1162         assertFalse(shardManager.getMBean().getSyncStatus());
1163
1164         // Make astronauts shard leader as well
1165         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1166         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1167                 RaftState.Follower.name(), RaftState.Leader.name()));
1168
1169         // Now sync status should be true
1170         assertTrue(shardManager.getMBean().getSyncStatus());
1171
1172         // Make astronauts a Follower
1173         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1174                 RaftState.Leader.name(), RaftState.Follower.name()));
1175
1176         // Sync status is not true
1177         assertFalse(shardManager.getMBean().getSyncStatus());
1178
1179         // Make the astronauts follower sync status true
1180         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1181
1182         // Sync status is now true
1183         assertTrue(shardManager.getMBean().getSyncStatus());
1184
1185         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1186     }
1187
1188     @Test
1189     public void testOnReceiveSwitchShardBehavior() {
1190         final TestKit kit = new TestKit(getSystem());
1191         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1192
1193         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1194         shardManager.tell(new ActorInitialized(), mockShardActor);
1195
1196         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1197
1198         SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1199             SwitchBehavior.class);
1200
1201         assertEquals(RaftState.Leader, switchBehavior.getNewState());
1202         assertEquals(1000, switchBehavior.getNewTerm());
1203     }
1204
1205     private static List<MemberName> members(final String... names) {
1206         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1207     }
1208
1209     @Test
1210     public void testOnCreateShard() {
1211         LOG.info("testOnCreateShard starting");
1212         final TestKit kit = new TestKit(getSystem());
1213         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1214
1215         ActorRef shardManager = actorFactory
1216                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1217                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1218
1219         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1220         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1221
1222         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1223                 .persistent(false).build();
1224         Shard.Builder shardBuilder = Shard.builder();
1225
1226         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1227             "foo", null, members("member-1", "member-5", "member-6"));
1228         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1229
1230         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1231
1232         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1233
1234         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1235
1236         assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1237         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1238             .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1239         assertEquals("peerMembers", Sets.newHashSet(
1240             ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1241             ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1242             shardBuilder.getPeerAddresses().keySet());
1243         assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1244             shardBuilder.getId());
1245         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1246
1247         // Send CreateShard with same name - should return Success with
1248         // a message.
1249
1250         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1251
1252         Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1253         assertNotNull("Success status is null", success.status());
1254
1255         LOG.info("testOnCreateShard ending");
1256     }
1257
1258     @Test
1259     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1260         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1261         final TestKit kit = new TestKit(getSystem());
1262         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1263
1264         ActorRef shardManager = actorFactory
1265                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1266                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1267
1268         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1269
1270         Shard.Builder shardBuilder = Shard.builder();
1271         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1272             "foo", null, members("member-5", "member-6"));
1273
1274         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1275         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1276
1277         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1278         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1279
1280         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1281         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1282             .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1283
1284         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1285     }
1286
1287     @Test
1288     public void testOnCreateShardWithNoInitialSchemaContext() {
1289         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1290         final TestKit kit = new TestKit(getSystem());
1291         ActorRef shardManager = actorFactory
1292                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1293                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1294
1295         Shard.Builder shardBuilder = Shard.builder();
1296
1297         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1298             "foo", null, members("member-1"));
1299         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1300
1301         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1302
1303         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1304         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1305
1306         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1307
1308         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1309
1310         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1311         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1312
1313         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1314     }
1315
1316     @Test
1317     public void testGetSnapshot() {
1318         LOG.info("testGetSnapshot starting");
1319         TestKit kit = new TestKit(getSystem());
1320
1321         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1322                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1323                 .put("astronauts", Collections.<String>emptyList()).build());
1324
1325         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1326                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1327
1328         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1329         Failure failure = kit.expectMsgClass(Failure.class);
1330         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1331
1332         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1333
1334         waitForShardInitialized(shardManager, "shard1", kit);
1335         waitForShardInitialized(shardManager, "shard2", kit);
1336
1337         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1338
1339         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1340
1341         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1342         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1343
1344         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1345             datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1346
1347         // Add a new replica
1348
1349         TestKit mockShardLeaderKit = new TestKit(getSystem());
1350
1351         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1352         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1353
1354         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1355         mockShardLeaderKit.expectMsgClass(AddServer.class);
1356         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1357         kit.expectMsgClass(Status.Success.class);
1358         waitForShardInitialized(shardManager, "astronauts", kit);
1359
1360         // Send another GetSnapshot and verify
1361
1362         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1363         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1364
1365         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1366                 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1367
1368         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1369         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1370         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1371                 Sets.newHashSet(snapshot.getShardList()));
1372
1373         LOG.info("testGetSnapshot ending");
1374     }
1375
1376     @Test
1377     public void testRestoreFromSnapshot() {
1378         LOG.info("testRestoreFromSnapshot starting");
1379
1380         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1381
1382         TestKit kit = new TestKit(getSystem());
1383
1384         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1385                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1386                 .put("astronauts", Collections.<String>emptyList()).build());
1387
1388         ShardManagerSnapshot snapshot =
1389                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1390         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1391                 Collections.<ShardSnapshot>emptyList());
1392         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1393                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1394
1395         shardManager.underlyingActor().waitForRecoveryComplete();
1396
1397         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1398
1399         waitForShardInitialized(shardManager, "shard1", kit);
1400         waitForShardInitialized(shardManager, "shard2", kit);
1401         waitForShardInitialized(shardManager, "astronauts", kit);
1402
1403         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1404
1405         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1406
1407         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1408
1409         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1410         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1411                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1412
1413         LOG.info("testRestoreFromSnapshot ending");
1414     }
1415
1416     @Test
1417     public void testAddShardReplicaForNonExistentShardConfig() {
1418         final TestKit kit = new TestKit(getSystem());
1419         ActorRef shardManager = actorFactory
1420                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1421                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1422
1423         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1424         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1425
1426         assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1427     }
1428
1429     @Test
1430     public void testAddShardReplica() {
1431         LOG.info("testAddShardReplica starting");
1432         MockConfiguration mockConfig = new MockConfiguration(
1433                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1434                         .put("astronauts", Arrays.asList("member-2")).build());
1435
1436         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1437         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1438
1439         // Create an ActorSystem ShardManager actor for member-1.
1440         final ActorSystem system1 = newActorSystem("Member1");
1441         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1442         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1443         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1444                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1445                         .cluster(new ClusterWrapperImpl(system1)).props()
1446                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1447                 shardManagerID);
1448
1449         // Create an ActorSystem ShardManager actor for member-2.
1450         final ActorSystem system2 = newActorSystem("Member2");
1451         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1452
1453         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1454         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1455         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1456                 Props.create(MockRespondActor.class, AddServer.class,
1457                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1458                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1459                 name);
1460         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1461                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1462                         .cluster(new ClusterWrapperImpl(system2)).props()
1463                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1464                 shardManagerID);
1465
1466         final TestKit kit = new TestKit(getSystem());
1467         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1468         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1469
1470         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1471
1472         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1473         leaderShardManager.tell(
1474             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1475             mockShardLeaderActor);
1476         leaderShardManager.tell(
1477             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1478             mockShardLeaderActor);
1479
1480         newReplicaShardManager.underlyingActor().waitForMemberUp();
1481         leaderShardManager.underlyingActor().waitForMemberUp();
1482
1483         // Have a dummy snapshot to be overwritten by the new data
1484         // persisted.
1485         String[] restoredShards = { "default", "people" };
1486         ShardManagerSnapshot snapshot =
1487                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1488         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1489         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1490
1491         InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1492         InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1493
1494         // construct a mock response message
1495         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1496         AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1497             AddServer.class);
1498         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1499         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1500         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1501
1502         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1503         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1504         List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1505             ShardManagerSnapshot.class);
1506         assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1507         ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1508         assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1509             Sets.newHashSet(shardManagerSnapshot.getShardList()));
1510         LOG.info("testAddShardReplica ending");
1511     }
1512
1513     @Test
1514     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1515         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1516         final TestKit kit = new TestKit(getSystem());
1517         TestActorRef<TestShardManager> shardManager = actorFactory
1518                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1519
1520         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1521         shardManager.tell(new ActorInitialized(), mockShardActor);
1522
1523         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1524         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1525         ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1526                 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1527
1528         MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1529
1530         String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1531         shardManager.tell(
1532             new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1533             mockShardActor);
1534         shardManager.tell(
1535             new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1536             mockShardActor);
1537
1538         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1539
1540         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1541
1542         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1543         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1544
1545         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1546         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1547
1548         // Send message again to verify previous in progress state is
1549         // cleared
1550
1551         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1552         resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1553         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1554
1555         // Send message again with an AddServer timeout to verify the
1556         // pre-existing shard actor isn't terminated.
1557
1558         shardManager.tell(
1559             newDatastoreContextFactory(
1560                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1561         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1562         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1563         kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1564
1565         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1566         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1567
1568         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1569     }
1570
1571     @Test
1572     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1573         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1574         final TestKit kit = new TestKit(getSystem());
1575         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1576         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1577
1578         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1579         shardManager.tell(new ActorInitialized(), mockShardActor);
1580         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1581             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1582         shardManager.tell(
1583             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1584             mockShardActor);
1585
1586         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1587         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1588         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1589
1590         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1591         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1592
1593         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1594     }
1595
1596     @Test
1597     public void testAddShardReplicaWithAddServerReplyFailure() {
1598         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1599         final TestKit kit = new TestKit(getSystem());
1600         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1601
1602         MockConfiguration mockConfig = new MockConfiguration(
1603             ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1604
1605         ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1606         final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1607             newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1608             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1609         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1610
1611         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1612
1613         TestKit terminateWatcher = new TestKit(getSystem());
1614         terminateWatcher.watch(mockNewReplicaShardActor);
1615
1616         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1617
1618         AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1619         assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1620             addServerMsg.getNewServerId());
1621         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1622
1623         Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1624         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1625
1626         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1627         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1628
1629         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1630
1631         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1632         mockShardLeaderKit.expectMsgClass(AddServer.class);
1633         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1634         failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1635         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1636
1637         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1638     }
1639
1640     @Test
1641     public void testAddShardReplicaWithAlreadyInProgress() {
1642         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1643                 AddServer.class, new AddShardReplica("astronauts"));
1644     }
1645
1646     @Test
1647     public void testAddShardReplicaWithFindPrimaryTimeout() {
1648         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1649         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1650         final TestKit kit = new TestKit(getSystem());
1651         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1652
1653         final ActorRef newReplicaShardManager = actorFactory
1654                 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1655                     .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1656
1657         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1658         MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1659             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1660
1661         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1662         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1663         assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1664
1665         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1666     }
1667
1668     @Test
1669     public void testRemoveShardReplicaForNonExistentShard() {
1670         final TestKit kit = new TestKit(getSystem());
1671         ActorRef shardManager = actorFactory
1672                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1673                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1674
1675         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1676         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1677         assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1678     }
1679
1680     @Test
1681     /**
1682      * Primary is Local.
1683      */
1684     public void testRemoveShardReplicaLocal() {
1685         final TestKit kit = new TestKit(getSystem());
1686         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1687
1688         final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1689             RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1690
1691         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1692
1693         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1694         shardManager.tell(new ActorInitialized(), respondActor);
1695         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1696             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1697         shardManager.tell(
1698             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1699             respondActor);
1700
1701         shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1702         final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1703             RemoveServer.class);
1704         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1705             removeServer.getServerId());
1706         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1707     }
1708
1709     @Test
1710     public void testRemoveShardReplicaRemote() {
1711         MockConfiguration mockConfig = new MockConfiguration(
1712                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1713                         .put("astronauts", Arrays.asList("member-1")).build());
1714
1715         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1716
1717         // Create an ActorSystem ShardManager actor for member-1.
1718         final ActorSystem system1 = newActorSystem("Member1");
1719         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1720         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1721
1722         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1723                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1724                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1725                 shardManagerID);
1726
1727         // Create an ActorSystem ShardManager actor for member-2.
1728         final ActorSystem system2 = newActorSystem("Member2");
1729         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1730
1731         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1732         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1733         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1734                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1735                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1736
1737         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1738
1739         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1740                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1741                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1742                 shardManagerID);
1743
1744         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1745         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1746         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1747         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1748         // look like so,
1749         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1750         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1751         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1752         // dead letters.
1753         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1754         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1755         // thing works as expected
1756         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1757                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1758                         "member-2-shard-default-" + shardMrgIDSuffix);
1759
1760         LOG.error("Forwarding actor : {}", actorRef);
1761
1762         final TestKit kit = new TestKit(getSystem());
1763         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1764         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1765
1766         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1767         newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1768
1769         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1770         leaderShardManager.tell(
1771             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1772             mockShardLeaderActor);
1773         leaderShardManager.tell(
1774             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1775             mockShardLeaderActor);
1776
1777         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1778         newReplicaShardManager.tell(
1779             new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1780             mockShardActor);
1781         newReplicaShardManager.tell(
1782             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1783             mockShardActor);
1784
1785         newReplicaShardManager.underlyingActor().waitForMemberUp();
1786         leaderShardManager.underlyingActor().waitForMemberUp();
1787
1788         // construct a mock response message
1789         newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1790         RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1791             RemoveServer.class);
1792         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1793         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1794         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1795     }
1796
1797     @Test
1798     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1799         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1800                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1801     }
1802
1803     @Test
1804     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1805         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1806                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1807     }
1808
1809
1810     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1811                                                       final Class<?> firstForwardedServerChangeClass,
1812                                                       final Object secondServerChange) {
1813         final TestKit kit = new TestKit(getSystem());
1814         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1815         final TestKit secondRequestKit = new TestKit(getSystem());
1816
1817         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1818             .put(shardName, Arrays.asList("member-2")).build());
1819
1820         final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1821             newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1822             .cluster(new MockClusterWrapper()).props()
1823             .withDispatcher(Dispatchers.DefaultDispatcherId()),
1824             shardMgrID);
1825
1826         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1827
1828         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1829
1830         shardManager.tell(firstServerChange, kit.getRef());
1831
1832         mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1833
1834         shardManager.tell(secondServerChange, secondRequestKit.getRef());
1835
1836         secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1837     }
1838
1839     @Test
1840     public void testServerRemovedShardActorNotRunning() {
1841         LOG.info("testServerRemovedShardActorNotRunning starting");
1842         final TestKit kit = new TestKit(getSystem());
1843         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1844             .put("default", Arrays.asList("member-1", "member-2"))
1845             .put("astronauts", Arrays.asList("member-2"))
1846             .put("people", Arrays.asList("member-1", "member-2")).build());
1847
1848         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1849             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1850
1851         shardManager.underlyingActor().waitForRecoveryComplete();
1852         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1853         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1854
1855         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1856         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1857
1858         // Removed the default shard replica from member-1
1859         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1860         ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1861                 .build();
1862         shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1863
1864         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1865
1866         LOG.info("testServerRemovedShardActorNotRunning ending");
1867     }
1868
1869     @Test
1870     public void testServerRemovedShardActorRunning() {
1871         LOG.info("testServerRemovedShardActorRunning starting");
1872         final TestKit kit = new TestKit(getSystem());
1873         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1874             .put("default", Arrays.asList("member-1", "member-2"))
1875             .put("astronauts", Arrays.asList("member-2"))
1876             .put("people", Arrays.asList("member-1", "member-2")).build());
1877
1878         String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1879         ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1880
1881         TestActorRef<TestShardManager> shardManager = actorFactory
1882                 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1883                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1884
1885         shardManager.underlyingActor().waitForRecoveryComplete();
1886
1887         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1888         shardManager.tell(new ActorInitialized(), shard);
1889
1890         waitForShardInitialized(shardManager, "people", kit);
1891         waitForShardInitialized(shardManager, "default", kit);
1892
1893         // Removed the default shard replica from member-1
1894         shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1895
1896         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1897
1898         MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1899
1900         LOG.info("testServerRemovedShardActorRunning ending");
1901     }
1902
1903     @Test
1904     public void testShardPersistenceWithRestoredData() {
1905         LOG.info("testShardPersistenceWithRestoredData starting");
1906         final TestKit kit = new TestKit(getSystem());
1907         MockConfiguration mockConfig =
1908                 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1909                     .put("default", Arrays.asList("member-1", "member-2"))
1910                     .put("astronauts", Arrays.asList("member-2"))
1911                     .put("people", Arrays.asList("member-1", "member-2")).build());
1912         String[] restoredShards = {"default", "astronauts"};
1913         ShardManagerSnapshot snapshot =
1914                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1915         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1916
1917         // create shardManager to come up with restored data
1918         TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1919             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1920
1921         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1922
1923         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1924         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1925         assertEquals("for uninitialized shard", "people", notFound.getShardName());
1926
1927         // Verify a local shard is created for the restored shards,
1928         // although we expect a NotInitializedException for the shards
1929         // as the actor initialization
1930         // message is not sent for them
1931         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1932         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1933
1934         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1935         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1936
1937         LOG.info("testShardPersistenceWithRestoredData ending");
1938     }
1939
1940     @Test
1941     public void testShutDown() throws Exception {
1942         LOG.info("testShutDown starting");
1943         final TestKit kit = new TestKit(getSystem());
1944         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1945             .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1946
1947         String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1948         ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1949
1950         String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1951         ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1952
1953         ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1954             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1955
1956         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1957         shardManager.tell(new ActorInitialized(), shard1);
1958         shardManager.tell(new ActorInitialized(), shard2);
1959
1960         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1961         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1962
1963         MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1964         MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1965
1966         try {
1967             Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1968             fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1969         } catch (TimeoutException e) {
1970             // expected
1971         }
1972
1973         actorFactory.killActor(shard1, kit);
1974         actorFactory.killActor(shard2, kit);
1975
1976         Boolean stopped = Await.result(stopFuture, duration);
1977         assertEquals("Stopped", Boolean.TRUE, stopped);
1978
1979         LOG.info("testShutDown ending");
1980     }
1981
1982     @Test
1983     public void testChangeServersVotingStatus() {
1984         final TestKit kit = new TestKit(getSystem());
1985         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1986
1987         ActorRef respondActor = actorFactory
1988                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
1989                     new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
1990
1991         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1992
1993         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1994         shardManager.tell(new ActorInitialized(), respondActor);
1995         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1996             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1997         shardManager.tell(
1998             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1999             respondActor);
2000
2001         shardManager.tell(
2002             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2003
2004         ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2005                 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2006         assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2007             ImmutableMap.of(ShardIdentifier
2008                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2009                 Boolean.TRUE));
2010
2011         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2012     }
2013
2014     @Test
2015     public void testChangeServersVotingStatusWithNoLeader() {
2016         final TestKit kit = new TestKit(getSystem());
2017         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2018
2019         ActorRef respondActor = actorFactory
2020                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2021                     new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2022
2023         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2024
2025         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2026         shardManager.tell(new ActorInitialized(), respondActor);
2027         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2028
2029         shardManager.tell(
2030             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2031
2032         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2033
2034         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2035         assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2036     }
2037
2038     @SuppressWarnings("unchecked")
2039     @Test
2040     public void testRegisterForShardLeaderChanges() {
2041         LOG.info("testRegisterForShardLeaderChanges starting");
2042
2043         final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2044         final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2045         final TestKit kit = new TestKit(getSystem());
2046         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2047
2048         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2049         shardManager.tell(new ActorInitialized(), mockShardActor);
2050
2051         final Consumer<String> mockCallback = mock(Consumer.class);
2052         shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2053
2054         final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2055         final Registration reg = (Registration) reply.status();
2056
2057         final DataTree mockDataTree = mock(DataTree.class);
2058         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2059             DataStoreVersions.CURRENT_VERSION), mockShardActor);
2060
2061         verify(mockCallback, timeout(5000)).accept("default");
2062
2063         reset(mockCallback);
2064         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2065                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2066
2067         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2068         verifyNoMoreInteractions(mockCallback);
2069
2070         shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2071                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2072
2073         verify(mockCallback, timeout(5000)).accept("default");
2074
2075         reset(mockCallback);
2076         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2077                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2078
2079         verify(mockCallback, timeout(5000)).accept("default");
2080
2081         reset(mockCallback);
2082         reg.close();
2083
2084         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2085                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2086
2087         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2088         verifyNoMoreInteractions(mockCallback);
2089
2090         LOG.info("testRegisterForShardLeaderChanges ending");
2091     }
2092
2093     public static class TestShardManager extends ShardManager {
2094         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2095         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2096         private ShardManagerSnapshot snapshot;
2097         private final Map<String, ActorRef> shardActors;
2098         private final ActorRef shardActor;
2099         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2100         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2101         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2102         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2103         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2104         private volatile MessageInterceptor messageInterceptor;
2105
2106         TestShardManager(final Builder builder) {
2107             super(builder);
2108             shardActor = builder.shardActor;
2109             shardActors = builder.shardActors;
2110         }
2111
2112         @Override
2113         protected void handleRecover(final Object message) throws Exception {
2114             try {
2115                 super.handleRecover(message);
2116             } finally {
2117                 if (message instanceof RecoveryCompleted) {
2118                     recoveryComplete.countDown();
2119                 }
2120             }
2121         }
2122
2123         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2124             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2125                 latch.countDown();
2126             }
2127         }
2128
2129         @Override
2130         public void handleCommand(final Object message) throws Exception {
2131             try {
2132                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2133                     getSender().tell(messageInterceptor.apply(message), getSelf());
2134                 } else {
2135                     super.handleCommand(message);
2136                 }
2137             } finally {
2138                 if (message instanceof FindPrimary) {
2139                     findPrimaryMessageReceived.countDown();
2140                 } else if (message instanceof ClusterEvent.MemberUp) {
2141                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2142                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2143                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2144                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2145                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2146                 } else if (message instanceof ClusterEvent.ReachableMember) {
2147                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2148                 }
2149             }
2150         }
2151
2152         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2153             this.messageInterceptor = messageInterceptor;
2154         }
2155
2156         void waitForRecoveryComplete() {
2157             assertTrue("Recovery complete",
2158                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2159         }
2160
2161         public void waitForMemberUp() {
2162             assertTrue("MemberUp received",
2163                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2164             memberUpReceived = new CountDownLatch(1);
2165         }
2166
2167         void waitForMemberRemoved() {
2168             assertTrue("MemberRemoved received",
2169                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2170             memberRemovedReceived = new CountDownLatch(1);
2171         }
2172
2173         void waitForUnreachableMember() {
2174             assertTrue("UnreachableMember received",
2175                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2176             memberUnreachableReceived = new CountDownLatch(1);
2177         }
2178
2179         void waitForReachableMember() {
2180             assertTrue("ReachableMember received",
2181                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2182             memberReachableReceived = new CountDownLatch(1);
2183         }
2184
2185         void verifyFindPrimary() {
2186             assertTrue("FindPrimary received",
2187                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2188             findPrimaryMessageReceived = new CountDownLatch(1);
2189         }
2190
2191         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2192             return new Builder(datastoreContextBuilder);
2193         }
2194
2195         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2196             private ActorRef shardActor;
2197             private final Map<String, ActorRef> shardActors = new HashMap<>();
2198
2199             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2200                 super(TestShardManager.class);
2201                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2202             }
2203
2204             Builder shardActor(final ActorRef newShardActor) {
2205                 this.shardActor = newShardActor;
2206                 return this;
2207             }
2208
2209             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2210                 shardActors.put(shardName, actorRef);
2211                 return this;
2212             }
2213         }
2214
2215         @Override
2216         public void saveSnapshot(final Object obj) {
2217             snapshot = (ShardManagerSnapshot) obj;
2218             snapshotPersist.countDown();
2219             super.saveSnapshot(obj);
2220         }
2221
2222         void verifySnapshotPersisted(final Set<String> shardList) {
2223             assertTrue("saveSnapshot invoked",
2224                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2225             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2226         }
2227
2228         @Override
2229         protected ActorRef newShardActor(final ShardInformation info) {
2230             if (shardActors.get(info.getShardName()) != null) {
2231                 return shardActors.get(info.getShardName());
2232             }
2233
2234             if (shardActor != null) {
2235                 return shardActor;
2236             }
2237
2238             return super.newShardActor(info);
2239         }
2240     }
2241
2242     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2243                                                      extends AbstractShardManagerCreator<T> {
2244         private final Class<C> shardManagerClass;
2245
2246         AbstractGenericCreator(final Class<C> shardManagerClass) {
2247             this.shardManagerClass = shardManagerClass;
2248             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
2249                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2250         }
2251
2252         @Override
2253         public Props props() {
2254             verify();
2255             return Props.create(shardManagerClass, this);
2256         }
2257     }
2258
2259     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2260         GenericCreator(final Class<C> shardManagerClass) {
2261             super(shardManagerClass);
2262         }
2263     }
2264
2265     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2266         private static final long serialVersionUID = 1L;
2267         private final Creator<ShardManager> delegate;
2268
2269         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2270             this.delegate = delegate;
2271         }
2272
2273         @Override
2274         public ShardManager create() throws Exception {
2275             return delegate.create();
2276         }
2277     }
2278
2279     interface MessageInterceptor extends Function<Object, Object> {
2280         boolean canIntercept(Object message);
2281     }
2282
2283     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2284         return new MessageInterceptor() {
2285             @Override
2286             public Object apply(final Object message) {
2287                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2288             }
2289
2290             @Override
2291             public boolean canIntercept(final Object message) {
2292                 return message instanceof FindPrimary;
2293             }
2294         };
2295     }
2296
2297     private static class MockRespondActor extends MessageCollectorActor {
2298         static final String CLEAR_RESPONSE = "clear-response";
2299
2300         private Object responseMsg;
2301         private final Class<?> requestClass;
2302
2303         @SuppressWarnings("unused")
2304         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2305             this.requestClass = requestClass;
2306             this.responseMsg = responseMsg;
2307         }
2308
2309         @Override
2310         public void onReceive(final Object message) throws Exception {
2311             if (message.equals(CLEAR_RESPONSE)) {
2312                 responseMsg = null;
2313             } else {
2314                 super.onReceive(message);
2315                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2316                     getSender().tell(responseMsg, getSelf());
2317                 }
2318             }
2319         }
2320     }
2321 }