Fold AbstractShardManagerTest
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.ArgumentMatchers.anyString;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSystem;
28 import akka.actor.AddressFromURIString;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Stopwatch;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.Lists;
49 import com.google.common.collect.Sets;
50 import com.google.common.util.concurrent.SettableFuture;
51 import com.google.common.util.concurrent.Uninterruptibles;
52 import java.time.Duration;
53 import java.util.AbstractMap;
54 import java.util.Arrays;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
59 import java.util.Map;
60 import java.util.Map.Entry;
61 import java.util.Set;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import java.util.function.Consumer;
66 import java.util.function.Function;
67 import java.util.stream.Collectors;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.runner.RunWith;
74 import org.mockito.junit.MockitoJUnitRunner;
75 import org.opendaylight.controller.cluster.access.concepts.MemberName;
76 import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest;
77 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
78 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
79 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
80 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
81 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
82 import org.opendaylight.controller.cluster.datastore.Shard;
83 import org.opendaylight.controller.cluster.datastore.config.Configuration;
84 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
85 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
86 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
87 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
88 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
89 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
90 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
91 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
92 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
93 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
94 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
95 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
96 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
97 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
98 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
99 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
100 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
102 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
103 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
104 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
105 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
106 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
107 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
108 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
109 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
110 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
111 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
112 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
113 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
114 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
115 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
116 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
117 import org.opendaylight.controller.cluster.raft.RaftState;
118 import org.opendaylight.controller.cluster.raft.TestActorFactory;
119 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
120 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
121 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
122 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
123 import org.opendaylight.controller.cluster.raft.messages.AddServer;
124 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
125 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
126 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
127 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
128 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
129 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
130 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
131 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
132 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
133 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
134 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
135 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
136 import org.opendaylight.yangtools.concepts.Registration;
137 import org.opendaylight.yangtools.yang.common.XMLNamespace;
138 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
139 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
140 import org.slf4j.Logger;
141 import org.slf4j.LoggerFactory;
142 import scala.concurrent.Await;
143 import scala.concurrent.Future;
144 import scala.concurrent.duration.FiniteDuration;
145
146 @RunWith(MockitoJUnitRunner.StrictStubs.class)
147 public class ShardManagerTest extends AbstractClusterRefActorTest {
148     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
149     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
150     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
151     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
152
153     private static int ID_COUNTER = 1;
154     private static ActorRef mockShardActor;
155     private static ShardIdentifier mockShardName;
156     private static SettableFuture<Void> ready;
157     private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
158
159     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
160     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
161     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
162             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
163             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
164
165     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
166
167     @BeforeClass
168     public static void beforeClass() {
169         TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
170     }
171
172     @AfterClass
173     public static void afterClass() {
174         TEST_SCHEMA_CONTEXT = null;
175     }
176
177     @Before
178     public void setUp() {
179         ready = SettableFuture.create();
180
181         InMemoryJournal.clear();
182         InMemorySnapshotStore.clear();
183
184         if (mockShardActor == null) {
185             mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
186             mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString());
187         }
188
189         MessageCollectorActor.clearMessages(mockShardActor);
190     }
191
192     @After
193     public void tearDown() {
194         InMemoryJournal.clear();
195         InMemorySnapshotStore.clear();
196
197         mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
198         await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated);
199         mockShardActor = null;
200
201         actorFactory.close();
202     }
203
204     private TestShardManager.Builder newTestShardMgrBuilder() {
205         return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
206     }
207
208     private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
209         return TestShardManager.builder(datastoreContextBuilder).configuration(config)
210                 .distributedDataStore(mock(DistributedDataStore.class));
211     }
212
213     private Props newShardMgrProps() {
214         return newShardMgrProps(new MockConfiguration());
215     }
216
217     private Props newShardMgrProps(final Configuration config) {
218         return newTestShardMgrBuilder(config).readinessFuture(ready).props();
219     }
220
221     private ActorSystem newActorSystem(final String config) {
222         return newActorSystem("cluster-test", config);
223     }
224
225     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
226         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
227         if (system == getSystem()) {
228             return actorFactory.createActor(MessageCollectorActor.props(), name);
229         }
230
231         return system.actorOf(MessageCollectorActor.props(), name);
232     }
233
234     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
235         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
236         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
237         doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
238         return mockFactory;
239     }
240
241     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
242         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
243     }
244
245     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
246         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
247                 .distributedDataStore(mock(DistributedDataStore.class));
248     }
249
250
251     private Props newPropsShardMgrWithMockShardActor() {
252         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
253                 Dispatchers.DefaultDispatcherId());
254     }
255
256     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
257         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
258                 .withDispatcher(Dispatchers.DefaultDispatcherId());
259     }
260
261
262     private TestShardManager newTestShardManager() {
263         return newTestShardManager(newShardMgrProps());
264     }
265
266     private TestShardManager newTestShardManager(final Props props) {
267         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
268         TestShardManager shardManager = shardManagerActor.underlyingActor();
269         shardManager.waitForRecoveryComplete();
270         return shardManager;
271     }
272
273     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
274             final TestKit kit) {
275         AssertionError last = null;
276         Stopwatch sw = Stopwatch.createStarted();
277         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
278             try {
279                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
280                 kit.expectMsgClass(LocalShardFound.class);
281                 return;
282             } catch (AssertionError e) {
283                 last = e;
284             }
285
286             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
287         }
288
289         throw last;
290     }
291
292     @SuppressWarnings("unchecked")
293     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
294         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
295         if (reply instanceof Failure) {
296             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
297         }
298
299         return (T)reply;
300     }
301
302     @Test
303     public void testPerShardDatastoreContext() throws Exception {
304         LOG.info("testPerShardDatastoreContext starting");
305         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
306                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
307
308         doReturn(
309                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
310                 .when(mockFactory).getShardDatastoreContext("default");
311
312         doReturn(
313                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
314                 .when(mockFactory).getShardDatastoreContext("topology");
315
316         final MockConfiguration mockConfig = new MockConfiguration() {
317             @Override
318             public Collection<String> getMemberShardNames(final MemberName memberName) {
319                 return Arrays.asList("default", "topology");
320             }
321
322             @Override
323             public Collection<MemberName> getMembersFromShardName(final String shardName) {
324                 return members("member-1");
325             }
326         };
327
328         final ActorRef defaultShardActor = actorFactory.createActor(
329                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
330         final ActorRef topologyShardActor = actorFactory.createActor(
331                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
332
333         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
334                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
335         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
336         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
337
338         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
339         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
340         class LocalShardManager extends ShardManager {
341             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
342                 super(creator);
343             }
344
345             @Override
346             protected ActorRef newShardActor(final ShardInformation info) {
347                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
348                 ActorRef ref = null;
349                 if (entry != null) {
350                     ref = entry.getKey();
351                     entry.setValue(info.getDatastoreContext());
352                 }
353
354                 newShardActorLatch.countDown();
355                 return ref;
356             }
357         }
358
359         final Creator<ShardManager> creator = new Creator<>() {
360             private static final long serialVersionUID = 1L;
361             @Override
362             public ShardManager create() {
363                 return new LocalShardManager(
364                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
365                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
366             }
367         };
368
369         final TestKit kit = new TestKit(getSystem());
370
371         final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
372                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
373
374         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
375
376         assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
377         assertEquals("getShardElectionTimeoutFactor", 6,
378                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
379         assertEquals("getShardElectionTimeoutFactor", 7,
380                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
381
382         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
383                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
384         doReturn(
385                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
386                 .when(newMockFactory).getShardDatastoreContext("default");
387
388         doReturn(
389                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
390                 .when(newMockFactory).getShardDatastoreContext("topology");
391
392         shardManager.tell(newMockFactory, kit.getRef());
393
394         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
395                 DatastoreContext.class);
396         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
397
398         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
399         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
400
401         LOG.info("testPerShardDatastoreContext ending");
402     }
403
404     @Test
405     public void testOnReceiveFindPrimaryForNonExistentShard() {
406         final TestKit kit = new TestKit(getSystem());
407         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
408
409         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
410
411         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
412
413         kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
414     }
415
416     @Test
417     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
418         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
419         final TestKit kit = new TestKit(getSystem());
420         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
421
422         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
423
424         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
425         shardManager.tell(new ActorInitialized(), mockShardActor);
426
427         DataTree mockDataTree = mock(DataTree.class);
428         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
429             DataStoreVersions.CURRENT_VERSION), kit.getRef());
430
431         MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
432         shardManager.tell(
433             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
434             mockShardActor);
435
436         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
437
438         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
439             LocalPrimaryShardFound.class);
440         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
441             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
442         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
443
444         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
445     }
446
447     @Test
448     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
449         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
450         final TestKit kit = new TestKit(getSystem());
451         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
452
453         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
454         shardManager.tell(new ActorInitialized(), mockShardActor);
455
456         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
457         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
458         shardManager.tell(
459             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
460             mockShardActor);
461         shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
462             mockShardActor);
463
464         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
465
466         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
467
468         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
469     }
470
471     @Test
472     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
473         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
474         final TestKit kit = new TestKit(getSystem());
475         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
476
477         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
478         shardManager.tell(new ActorInitialized(), mockShardActor);
479
480         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
481         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
482
483         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
484         shardManager.tell(
485             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
486             mockShardActor);
487         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
488         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
489
490         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
491
492         RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
493         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
494             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
495         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
496
497         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
498     }
499
500     @Test
501     public void testOnReceiveFindPrimaryForUninitializedShard() {
502         final TestKit kit = new TestKit(getSystem());
503         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
504
505         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
506
507         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
508     }
509
510     @Test
511     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
512         final TestKit kit = new TestKit(getSystem());
513         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
514
515         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
516         shardManager.tell(new ActorInitialized(), mockShardActor);
517
518         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
519
520         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
521     }
522
523     @Test
524     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
525         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
526         final TestKit kit = new TestKit(getSystem());
527         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
528
529         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
530         shardManager.tell(new ActorInitialized(), mockShardActor);
531
532         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
533         shardManager.tell(
534             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
535             mockShardActor);
536
537         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
538
539         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
540
541         DataTree mockDataTree = mock(DataTree.class);
542         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
543             DataStoreVersions.CURRENT_VERSION), mockShardActor);
544
545         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
546
547         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
548             LocalPrimaryShardFound.class);
549         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
550             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
551         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
552
553         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
554     }
555
556     @Test
557     public void testOnReceiveFindPrimaryWaitForShardLeader() {
558         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
559         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
560         final TestKit kit = new TestKit(getSystem());
561         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
562
563         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
564
565         // We're passing waitUntilInitialized = true to FindPrimary so
566         // the response should be
567         // delayed until we send ActorInitialized and
568         // RoleChangeNotification.
569         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
570
571         kit.expectNoMessage(Duration.ofMillis(150));
572
573         shardManager.tell(new ActorInitialized(), mockShardActor);
574
575         kit.expectNoMessage(Duration.ofMillis(150));
576
577         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
578         shardManager.tell(
579             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
580             mockShardActor);
581
582         kit.expectNoMessage(Duration.ofMillis(150));
583
584         DataTree mockDataTree = mock(DataTree.class);
585         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
586             DataStoreVersions.CURRENT_VERSION), mockShardActor);
587
588         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
589         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
590             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
591         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
592
593         kit.expectNoMessage(Duration.ofMillis(200));
594
595         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
596     }
597
598     @Test
599     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
600         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
601         final TestKit kit = new TestKit(getSystem());
602         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
603
604         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
605
606         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
607
608         kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
609
610         shardManager.tell(new ActorInitialized(), mockShardActor);
611
612         kit.expectNoMessage(Duration.ofMillis(200));
613
614         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
615     }
616
617     @Test
618     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
619         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
620         final TestKit kit = new TestKit(getSystem());
621         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
622
623         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
624         shardManager.tell(new ActorInitialized(), mockShardActor);
625         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
626             RaftState.Candidate.name()), mockShardActor);
627
628         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
629
630         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
631
632         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
633     }
634
635     @Test
636     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
637         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
638         final TestKit kit = new TestKit(getSystem());
639         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
640
641         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
642         shardManager.tell(new ActorInitialized(), mockShardActor);
643         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
644             RaftState.IsolatedLeader.name()), mockShardActor);
645
646         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
647
648         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
649
650         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
651     }
652
653     @Test
654     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
655         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
656         final TestKit kit = new TestKit(getSystem());
657         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
658
659         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
660         shardManager.tell(new ActorInitialized(), mockShardActor);
661
662         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
663
664         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
665
666         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
667     }
668
669     @Test
670     public void testOnReceiveFindPrimaryForRemoteShard() {
671         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
672         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
673
674         // Create an ActorSystem ShardManager actor for member-1.
675
676         final ActorSystem system1 = newActorSystem("Member1");
677         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
678
679         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
680                 newTestShardMgrBuilderWithMockShardActor().cluster(
681                         new ClusterWrapperImpl(system1)).props().withDispatcher(
682                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
683
684         // Create an ActorSystem ShardManager actor for member-2.
685
686         final ActorSystem system2 = newActorSystem("Member2");
687
688         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
689
690         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
691
692         MockConfiguration mockConfig2 = new MockConfiguration(
693                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
694                         .put("astronauts", Arrays.asList("member-2")).build());
695
696         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
697                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
698                         new ClusterWrapperImpl(system2)).props().withDispatcher(
699                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
700
701         final TestKit kit = new TestKit(system1);
702         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
703         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
704
705         shardManager2.tell(new ActorInitialized(), mockShardActor2);
706
707         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
708         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
709         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
710             mockShardActor2);
711         shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
712             mockShardActor2);
713
714         shardManager1.underlyingActor().waitForMemberUp();
715         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
716
717         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
718         String path = found.getPrimaryPath();
719         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
720         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
721
722         shardManager2.underlyingActor().verifyFindPrimary();
723
724         // This part times out quite a bit on jenkins for some reason
725
726 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
727 //
728 //                shardManager1.underlyingActor().waitForMemberRemoved();
729 //
730 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
731 //
732 //                expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
733
734         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
735     }
736
737     @Test
738     public void testShardAvailabilityOnChangeOfMemberReachability() {
739         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
740         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
741
742         // Create an ActorSystem ShardManager actor for member-1.
743
744         final ActorSystem system1 = newActorSystem("Member1");
745         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
746
747         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
748
749         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
750                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
751                         new ClusterWrapperImpl(system1)).props().withDispatcher(
752                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
753
754         // Create an ActorSystem ShardManager actor for member-2.
755
756         final ActorSystem system2 = newActorSystem("Member2");
757
758         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
759
760         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
761
762         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
763                 .put("default", Arrays.asList("member-1", "member-2")).build());
764
765         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
766                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
767                         new ClusterWrapperImpl(system2)).props().withDispatcher(
768                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
769
770         final TestKit kit = new TestKit(system1);
771         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
772         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
773         shardManager1.tell(new ActorInitialized(), mockShardActor1);
774         shardManager2.tell(new ActorInitialized(), mockShardActor2);
775
776         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
777         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
778         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
779             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
780         shardManager1.tell(
781             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
782             mockShardActor1);
783         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
784             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
785         shardManager2.tell(
786             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
787             mockShardActor2);
788         shardManager1.underlyingActor().waitForMemberUp();
789
790         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
791
792         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
793         String path = found.getPrimaryPath();
794         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
795
796         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
797             kit.getRef());
798
799         shardManager1.underlyingActor().waitForUnreachableMember();
800         MessageCollectorActor.clearMessages(mockShardActor1);
801
802         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
803             kit.getRef());
804
805         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
806
807         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
808
809         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
810             kit.getRef());
811
812         shardManager1.underlyingActor().waitForReachableMember();
813
814         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
815
816         RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
817         String path1 = found1.getPrimaryPath();
818         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
819
820         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
821             kit.getRef());
822
823         // Test FindPrimary wait succeeds after reachable member event.
824
825         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
826                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
827         shardManager1.underlyingActor().waitForUnreachableMember();
828
829         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
830
831         shardManager1.tell(
832             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
833
834         RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
835         String path2 = found2.getPrimaryPath();
836         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
837
838         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
839     }
840
841     @Test
842     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
843         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
844         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
845
846         // Create an ActorSystem ShardManager actor for member-1.
847
848         final ActorSystem system1 = newActorSystem("Member1");
849         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
850
851         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
852
853         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
854         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
855                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
856                         .primaryShardInfoCache(primaryShardInfoCache).props()
857                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
858                 shardManagerID);
859
860         // Create an ActorSystem ShardManager actor for member-2.
861
862         final ActorSystem system2 = newActorSystem("Member2");
863
864         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
865
866         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
867
868         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
869                 .put("default", Arrays.asList("member-1", "member-2")).build());
870
871         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
872                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
873                         new ClusterWrapperImpl(system2)).props().withDispatcher(
874                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
875
876         final TestKit kit = new TestKit(system1);
877         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
878         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
879         shardManager1.tell(new ActorInitialized(), mockShardActor1);
880         shardManager2.tell(new ActorInitialized(), mockShardActor2);
881
882         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
883         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
884         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
885             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
886         shardManager1.tell(
887             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
888             mockShardActor1);
889         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
890             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
891         shardManager2.tell(
892             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
893             mockShardActor2);
894         shardManager1.underlyingActor().waitForMemberUp();
895
896         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
897
898         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
899         String path = found.getPrimaryPath();
900         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
901
902         primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
903             system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
904
905         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
906                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
907
908         shardManager1.underlyingActor().waitForUnreachableMember();
909
910         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
911
912         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
913
914         assertNull("Expected primaryShardInfoCache entry removed",
915             primaryShardInfoCache.getIfPresent("default"));
916
917         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
918             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
919         shardManager1.tell(
920             new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
921             mockShardActor1);
922
923         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
924
925         LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
926         String path1 = found1.getPrimaryPath();
927         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
928
929         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
930     }
931
932     @Test
933     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
934         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
935         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
936
937         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
938                 .put("default", Arrays.asList("member-256", "member-2")).build());
939
940         // Create an ActorSystem, ShardManager and actor for member-256.
941
942         final ActorSystem system256 = newActorSystem("Member256");
943         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
944         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
945
946         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
947
948         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
949
950         // ShardManager must be created with shard configuration to let its localShards has shards.
951         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
952                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
953                         .cluster(new ClusterWrapperImpl(system256))
954                         .primaryShardInfoCache(primaryShardInfoCache).props()
955                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
956                 shardManagerID);
957
958         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
959
960         final ActorSystem system2 = newActorSystem("Member2");
961
962         // Join member-2 into the cluster of member-256.
963         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
964
965         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
966
967         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
968                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
969                         new ClusterWrapperImpl(system2)).props().withDispatcher(
970                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
971
972         final TestKit kit256 = new TestKit(system256);
973         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
974         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
975         shardManager256.tell(new ActorInitialized(), mockShardActor256);
976         shardManager2.tell(new ActorInitialized(), mockShardActor2);
977
978         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
979         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
980         shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
981             DataStoreVersions.CURRENT_VERSION), mockShardActor256);
982         shardManager256.tell(
983             new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
984             mockShardActor256);
985         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
986             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
987         shardManager2.tell(
988             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
989             mockShardActor2);
990         shardManager256.underlyingActor().waitForMemberUp();
991
992         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
993
994         LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
995         String path = found.getPrimaryPath();
996         assertTrue("Unexpected primary path " + path + " which must on member-256",
997             path.contains("member-256-shard-default-config"));
998
999         PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
1000             system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
1001         primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
1002
1003         // Simulate member-2 become unreachable.
1004         shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
1005                 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
1006         shardManager256.underlyingActor().waitForUnreachableMember();
1007
1008         // Make sure leader shard on member-256 is still leader and still in the cache.
1009         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
1010         found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
1011         path = found.getPrimaryPath();
1012         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
1013             path.contains("member-256-shard-default-config"));
1014         Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
1015         futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
1016             @Override
1017             public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
1018                 if (failure != null) {
1019                     assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
1020                 } else {
1021                     assertEquals("Expected primaryShardInfoCache entry",
1022                         primaryShardInfo, futurePrimaryShardInfo);
1023                 }
1024             }
1025         }, system256.dispatchers().defaultGlobalDispatcher());
1026
1027         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
1028     }
1029
1030     @Test
1031     public void testOnReceiveFindLocalShardForNonExistentShard() {
1032         final TestKit kit = new TestKit(getSystem());
1033         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1034
1035         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1036
1037         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
1038
1039         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1040
1041         assertEquals("getShardName", "non-existent", notFound.getShardName());
1042     }
1043
1044     @Test
1045     public void testOnReceiveFindLocalShardForExistentShard() {
1046         final TestKit kit = new TestKit(getSystem());
1047         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1048
1049         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1050         shardManager.tell(new ActorInitialized(), mockShardActor);
1051
1052         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1053
1054         LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1055
1056         assertTrue("Found path contains " + found.getPath().path().toString(),
1057             found.getPath().path().toString().contains("member-1-shard-default-config"));
1058     }
1059
1060     @Test
1061     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1062         final TestKit kit = new TestKit(getSystem());
1063         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1064
1065         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1066
1067         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1068     }
1069
1070     @Test
1071     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1072         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1073         final TestKit kit = new TestKit(getSystem());
1074         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1075
1076         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1077
1078         // We're passing waitUntilInitialized = true to FindLocalShard
1079         // so the response should be
1080         // delayed until we send ActorInitialized.
1081         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1082             new Timeout(5, TimeUnit.SECONDS));
1083
1084         shardManager.tell(new ActorInitialized(), mockShardActor);
1085
1086         Object resp = Await.result(future, kit.duration("5 seconds"));
1087         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1088
1089         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1090     }
1091
1092     @Test
1093     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1094         TestShardManager shardManager = newTestShardManager();
1095
1096         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1097         shardManager.handleCommand(new RoleChangeNotification(
1098                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1099         assertFalse(ready.isDone());
1100
1101         shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1102                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1103         assertTrue(ready.isDone());
1104     }
1105
1106     @Test
1107     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1108         final TestKit kit = new TestKit(getSystem());
1109         TestShardManager shardManager = newTestShardManager();
1110
1111         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1112         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1113         assertFalse(ready.isDone());
1114
1115         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1116
1117         shardManager.handleCommand(
1118             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1119                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1120         assertTrue(ready.isDone());
1121     }
1122
1123     @Test
1124     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1125         final TestKit kit = new TestKit(getSystem());
1126         TestShardManager shardManager = newTestShardManager();
1127
1128         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1129         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1130         assertFalse(ready.isDone());
1131
1132         shardManager.handleCommand(
1133             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1134                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1135
1136         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1137         assertTrue(ready.isDone());
1138     }
1139
1140     @Test
1141     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1142         TestShardManager shardManager = newTestShardManager();
1143
1144         shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1145             RaftState.Leader.name()));
1146         assertFalse(ready.isDone());
1147     }
1148
1149     @Test
1150     public void testByDefaultSyncStatusIsFalse() {
1151         TestShardManager shardManager = newTestShardManager();
1152
1153         assertFalse(shardManager.getMBean().getSyncStatus());
1154     }
1155
1156     @Test
1157     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1158         TestShardManager shardManager = newTestShardManager();
1159
1160         shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1161                 RaftState.Follower.name(), RaftState.Leader.name()));
1162
1163         assertTrue(shardManager.getMBean().getSyncStatus());
1164     }
1165
1166     @Test
1167     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1168         TestShardManager shardManager = newTestShardManager();
1169
1170         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1171         shardManager.handleCommand(new RoleChangeNotification(shardId,
1172                 RaftState.Follower.name(), RaftState.Candidate.name()));
1173
1174         assertFalse(shardManager.getMBean().getSyncStatus());
1175
1176         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1177         shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1178                 true, shardId));
1179
1180         assertFalse(shardManager.getMBean().getSyncStatus());
1181     }
1182
1183     @Test
1184     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1185         TestShardManager shardManager = newTestShardManager();
1186
1187         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1188         shardManager.handleCommand(new RoleChangeNotification(shardId,
1189                 RaftState.Candidate.name(), RaftState.Follower.name()));
1190
1191         // Initially will be false
1192         assertFalse(shardManager.getMBean().getSyncStatus());
1193
1194         // Send status true will make sync status true
1195         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1196
1197         assertTrue(shardManager.getMBean().getSyncStatus());
1198
1199         // Send status false will make sync status false
1200         shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1201
1202         assertFalse(shardManager.getMBean().getSyncStatus());
1203     }
1204
1205     @Test
1206     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1207         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1208         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1209             @Override
1210             public List<String> getMemberShardNames(final MemberName memberName) {
1211                 return Arrays.asList("default", "astronauts");
1212             }
1213         }));
1214
1215         // Initially will be false
1216         assertFalse(shardManager.getMBean().getSyncStatus());
1217
1218         // Make default shard leader
1219         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1220         shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1221                 RaftState.Follower.name(), RaftState.Leader.name()));
1222
1223         // default = Leader, astronauts is unknown so sync status remains false
1224         assertFalse(shardManager.getMBean().getSyncStatus());
1225
1226         // Make astronauts shard leader as well
1227         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1228         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1229                 RaftState.Follower.name(), RaftState.Leader.name()));
1230
1231         // Now sync status should be true
1232         assertTrue(shardManager.getMBean().getSyncStatus());
1233
1234         // Make astronauts a Follower
1235         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1236                 RaftState.Leader.name(), RaftState.Follower.name()));
1237
1238         // Sync status is not true
1239         assertFalse(shardManager.getMBean().getSyncStatus());
1240
1241         // Make the astronauts follower sync status true
1242         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1243
1244         // Sync status is now true
1245         assertTrue(shardManager.getMBean().getSyncStatus());
1246
1247         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1248     }
1249
1250     @Test
1251     public void testOnReceiveSwitchShardBehavior() {
1252         final TestKit kit = new TestKit(getSystem());
1253         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1254
1255         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1256         shardManager.tell(new ActorInitialized(), mockShardActor);
1257
1258         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1259
1260         SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1261             SwitchBehavior.class);
1262
1263         assertEquals(RaftState.Leader, switchBehavior.getNewState());
1264         assertEquals(1000, switchBehavior.getNewTerm());
1265     }
1266
1267     private static List<MemberName> members(final String... names) {
1268         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1269     }
1270
1271     @Test
1272     public void testOnCreateShard() {
1273         LOG.info("testOnCreateShard starting");
1274         final TestKit kit = new TestKit(getSystem());
1275         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1276
1277         ActorRef shardManager = actorFactory
1278                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1279                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1280
1281         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1282         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1283
1284         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1285                 .persistent(false).build();
1286         Shard.Builder shardBuilder = Shard.builder();
1287
1288         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1289             "foo", null, members("member-1", "member-5", "member-6"));
1290         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1291
1292         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1293
1294         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1295
1296         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1297
1298         assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1299         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1300             .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1301         assertEquals("peerMembers", Sets.newHashSet(
1302             ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1303             ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1304             shardBuilder.getPeerAddresses().keySet());
1305         assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1306             shardBuilder.getId());
1307         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1308
1309         // Send CreateShard with same name - should return Success with
1310         // a message.
1311
1312         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1313
1314         Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1315         assertNotNull("Success status is null", success.status());
1316
1317         LOG.info("testOnCreateShard ending");
1318     }
1319
1320     @Test
1321     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1322         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1323         final TestKit kit = new TestKit(getSystem());
1324         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1325
1326         ActorRef shardManager = actorFactory
1327                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1328                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1329
1330         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1331
1332         Shard.Builder shardBuilder = Shard.builder();
1333         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1334             "foo", null, members("member-5", "member-6"));
1335
1336         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1337         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1338
1339         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1340         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1341
1342         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1343         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1344             .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1345
1346         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1347     }
1348
1349     @Test
1350     public void testOnCreateShardWithNoInitialSchemaContext() {
1351         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1352         final TestKit kit = new TestKit(getSystem());
1353         ActorRef shardManager = actorFactory
1354                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1355                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1356
1357         Shard.Builder shardBuilder = Shard.builder();
1358
1359         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1360             "foo", null, members("member-1"));
1361         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1362
1363         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1364
1365         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1366         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1367
1368         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1369
1370         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1371
1372         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1373         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1374
1375         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1376     }
1377
1378     @Test
1379     public void testGetSnapshot() {
1380         LOG.info("testGetSnapshot starting");
1381         TestKit kit = new TestKit(getSystem());
1382
1383         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1384                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1385                 .put("astronauts", Collections.<String>emptyList()).build());
1386
1387         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1388                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1389
1390         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1391         Failure failure = kit.expectMsgClass(Failure.class);
1392         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1393
1394         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1395
1396         waitForShardInitialized(shardManager, "shard1", kit);
1397         waitForShardInitialized(shardManager, "shard2", kit);
1398
1399         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1400
1401         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1402
1403         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1404         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1405
1406         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1407             datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1408
1409         // Add a new replica
1410
1411         TestKit mockShardLeaderKit = new TestKit(getSystem());
1412
1413         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1414         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1415
1416         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1417         mockShardLeaderKit.expectMsgClass(AddServer.class);
1418         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1419         kit.expectMsgClass(Status.Success.class);
1420         waitForShardInitialized(shardManager, "astronauts", kit);
1421
1422         // Send another GetSnapshot and verify
1423
1424         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1425         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1426
1427         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1428                 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1429
1430         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1431         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1432         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1433                 Sets.newHashSet(snapshot.getShardList()));
1434
1435         LOG.info("testGetSnapshot ending");
1436     }
1437
1438     @Test
1439     public void testRestoreFromSnapshot() {
1440         LOG.info("testRestoreFromSnapshot starting");
1441
1442         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1443
1444         TestKit kit = new TestKit(getSystem());
1445
1446         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1447                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1448                 .put("astronauts", Collections.<String>emptyList()).build());
1449
1450         ShardManagerSnapshot snapshot =
1451                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1452         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1453                 Collections.<ShardSnapshot>emptyList());
1454         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1455                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1456
1457         shardManager.underlyingActor().waitForRecoveryComplete();
1458
1459         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1460
1461         waitForShardInitialized(shardManager, "shard1", kit);
1462         waitForShardInitialized(shardManager, "shard2", kit);
1463         waitForShardInitialized(shardManager, "astronauts", kit);
1464
1465         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1466
1467         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1468
1469         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1470
1471         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1472         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1473                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1474
1475         LOG.info("testRestoreFromSnapshot ending");
1476     }
1477
1478     @Test
1479     public void testAddShardReplicaForNonExistentShardConfig() {
1480         final TestKit kit = new TestKit(getSystem());
1481         ActorRef shardManager = actorFactory
1482                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1483                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1484
1485         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1486         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1487
1488         assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1489     }
1490
1491     @Test
1492     public void testAddShardReplica() {
1493         LOG.info("testAddShardReplica starting");
1494         MockConfiguration mockConfig = new MockConfiguration(
1495                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1496                         .put("astronauts", Arrays.asList("member-2")).build());
1497
1498         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1499         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1500
1501         // Create an ActorSystem ShardManager actor for member-1.
1502         final ActorSystem system1 = newActorSystem("Member1");
1503         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1504         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1505         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1506                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1507                         .cluster(new ClusterWrapperImpl(system1)).props()
1508                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1509                 shardManagerID);
1510
1511         // Create an ActorSystem ShardManager actor for member-2.
1512         final ActorSystem system2 = newActorSystem("Member2");
1513         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1514
1515         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1516         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1517         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1518                 Props.create(MockRespondActor.class, AddServer.class,
1519                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1520                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1521                 name);
1522         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1523                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1524                         .cluster(new ClusterWrapperImpl(system2)).props()
1525                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1526                 shardManagerID);
1527
1528         final TestKit kit = new TestKit(getSystem());
1529         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1530         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1531
1532         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1533
1534         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1535         leaderShardManager.tell(
1536             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1537             mockShardLeaderActor);
1538         leaderShardManager.tell(
1539             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1540             mockShardLeaderActor);
1541
1542         newReplicaShardManager.underlyingActor().waitForMemberUp();
1543         leaderShardManager.underlyingActor().waitForMemberUp();
1544
1545         // Have a dummy snapshot to be overwritten by the new data
1546         // persisted.
1547         String[] restoredShards = { "default", "people" };
1548         ShardManagerSnapshot snapshot =
1549                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1550         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1551         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1552
1553         InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1554         InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1555
1556         // construct a mock response message
1557         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1558         AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1559             AddServer.class);
1560         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1561         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1562         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1563
1564         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1565         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1566         List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1567             ShardManagerSnapshot.class);
1568         assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1569         ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1570         assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1571             Sets.newHashSet(shardManagerSnapshot.getShardList()));
1572         LOG.info("testAddShardReplica ending");
1573     }
1574
1575     @Test
1576     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1577         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1578         final TestKit kit = new TestKit(getSystem());
1579         TestActorRef<TestShardManager> shardManager = actorFactory
1580                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1581
1582         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1583         shardManager.tell(new ActorInitialized(), mockShardActor);
1584
1585         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1586         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1587         ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1588                 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1589
1590         MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1591
1592         String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1593         shardManager.tell(
1594             new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1595             mockShardActor);
1596         shardManager.tell(
1597             new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1598             mockShardActor);
1599
1600         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1601
1602         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1603
1604         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1605         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1606
1607         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1608         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1609
1610         // Send message again to verify previous in progress state is
1611         // cleared
1612
1613         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1614         resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1615         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1616
1617         // Send message again with an AddServer timeout to verify the
1618         // pre-existing shard actor isn't terminated.
1619
1620         shardManager.tell(
1621             newDatastoreContextFactory(
1622                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1623         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1624         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1625         kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1626
1627         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1628         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1629
1630         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1631     }
1632
1633     @Test
1634     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1635         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1636         final TestKit kit = new TestKit(getSystem());
1637         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1638         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1639
1640         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1641         shardManager.tell(new ActorInitialized(), mockShardActor);
1642         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1643             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1644         shardManager.tell(
1645             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1646             mockShardActor);
1647
1648         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1649         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1650         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1651
1652         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1653         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1654
1655         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1656     }
1657
1658     @Test
1659     public void testAddShardReplicaWithAddServerReplyFailure() {
1660         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1661         final TestKit kit = new TestKit(getSystem());
1662         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1663
1664         MockConfiguration mockConfig = new MockConfiguration(
1665             ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1666
1667         ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1668         final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1669             newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1670             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1671         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1672
1673         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1674
1675         TestKit terminateWatcher = new TestKit(getSystem());
1676         terminateWatcher.watch(mockNewReplicaShardActor);
1677
1678         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1679
1680         AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1681         assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1682             addServerMsg.getNewServerId());
1683         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1684
1685         Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1686         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1687
1688         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1689         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1690
1691         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1692
1693         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1694         mockShardLeaderKit.expectMsgClass(AddServer.class);
1695         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1696         failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1697         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1698
1699         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1700     }
1701
1702     @Test
1703     public void testAddShardReplicaWithAlreadyInProgress() {
1704         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1705                 AddServer.class, new AddShardReplica("astronauts"));
1706     }
1707
1708     @Test
1709     public void testAddShardReplicaWithFindPrimaryTimeout() {
1710         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1711         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1712         final TestKit kit = new TestKit(getSystem());
1713         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1714
1715         final ActorRef newReplicaShardManager = actorFactory
1716                 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1717                     .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1718
1719         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1720         MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1721             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1722
1723         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1724         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1725         assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1726
1727         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1728     }
1729
1730     @Test
1731     public void testRemoveShardReplicaForNonExistentShard() {
1732         final TestKit kit = new TestKit(getSystem());
1733         ActorRef shardManager = actorFactory
1734                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1735                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1736
1737         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1738         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1739         assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1740     }
1741
1742     @Test
1743     /**
1744      * Primary is Local.
1745      */
1746     public void testRemoveShardReplicaLocal() {
1747         final TestKit kit = new TestKit(getSystem());
1748         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1749
1750         final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1751             RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1752
1753         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1754
1755         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1756         shardManager.tell(new ActorInitialized(), respondActor);
1757         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1758             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1759         shardManager.tell(
1760             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1761             respondActor);
1762
1763         shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1764         final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1765             RemoveServer.class);
1766         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1767             removeServer.getServerId());
1768         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1769     }
1770
1771     @Test
1772     public void testRemoveShardReplicaRemote() {
1773         MockConfiguration mockConfig = new MockConfiguration(
1774                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1775                         .put("astronauts", Arrays.asList("member-1")).build());
1776
1777         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1778
1779         // Create an ActorSystem ShardManager actor for member-1.
1780         final ActorSystem system1 = newActorSystem("Member1");
1781         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1782         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1783
1784         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1785                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1786                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1787                 shardManagerID);
1788
1789         // Create an ActorSystem ShardManager actor for member-2.
1790         final ActorSystem system2 = newActorSystem("Member2");
1791         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1792
1793         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1794         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1795         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1796                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1797                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1798
1799         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1800
1801         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1802                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1803                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1804                 shardManagerID);
1805
1806         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1807         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1808         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1809         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1810         // look like so,
1811         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1812         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1813         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1814         // dead letters.
1815         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1816         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1817         // thing works as expected
1818         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1819                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1820                         "member-2-shard-default-" + shardMrgIDSuffix);
1821
1822         LOG.error("Forwarding actor : {}", actorRef);
1823
1824         final TestKit kit = new TestKit(getSystem());
1825         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1826         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1827
1828         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1829         newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1830
1831         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1832         leaderShardManager.tell(
1833             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1834             mockShardLeaderActor);
1835         leaderShardManager.tell(
1836             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1837             mockShardLeaderActor);
1838
1839         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1840         newReplicaShardManager.tell(
1841             new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1842             mockShardActor);
1843         newReplicaShardManager.tell(
1844             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1845             mockShardActor);
1846
1847         newReplicaShardManager.underlyingActor().waitForMemberUp();
1848         leaderShardManager.underlyingActor().waitForMemberUp();
1849
1850         // construct a mock response message
1851         newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1852         RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1853             RemoveServer.class);
1854         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1855         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1856         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1857     }
1858
1859     @Test
1860     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1861         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1862                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1863     }
1864
1865     @Test
1866     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1867         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1868                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1869     }
1870
1871
1872     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1873                                                       final Class<?> firstForwardedServerChangeClass,
1874                                                       final Object secondServerChange) {
1875         final TestKit kit = new TestKit(getSystem());
1876         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1877         final TestKit secondRequestKit = new TestKit(getSystem());
1878
1879         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1880             .put(shardName, Arrays.asList("member-2")).build());
1881
1882         final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1883             newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1884             .cluster(new MockClusterWrapper()).props()
1885             .withDispatcher(Dispatchers.DefaultDispatcherId()),
1886             shardMgrID);
1887
1888         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1889
1890         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1891
1892         shardManager.tell(firstServerChange, kit.getRef());
1893
1894         mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1895
1896         shardManager.tell(secondServerChange, secondRequestKit.getRef());
1897
1898         secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1899     }
1900
1901     @Test
1902     public void testServerRemovedShardActorNotRunning() {
1903         LOG.info("testServerRemovedShardActorNotRunning starting");
1904         final TestKit kit = new TestKit(getSystem());
1905         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1906             .put("default", Arrays.asList("member-1", "member-2"))
1907             .put("astronauts", Arrays.asList("member-2"))
1908             .put("people", Arrays.asList("member-1", "member-2")).build());
1909
1910         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1911             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1912
1913         shardManager.underlyingActor().waitForRecoveryComplete();
1914         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1915         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1916
1917         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1918         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1919
1920         // Removed the default shard replica from member-1
1921         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1922         ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1923                 .build();
1924         shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1925
1926         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1927
1928         LOG.info("testServerRemovedShardActorNotRunning ending");
1929     }
1930
1931     @Test
1932     public void testServerRemovedShardActorRunning() {
1933         LOG.info("testServerRemovedShardActorRunning starting");
1934         final TestKit kit = new TestKit(getSystem());
1935         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1936             .put("default", Arrays.asList("member-1", "member-2"))
1937             .put("astronauts", Arrays.asList("member-2"))
1938             .put("people", Arrays.asList("member-1", "member-2")).build());
1939
1940         String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1941         ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1942
1943         TestActorRef<TestShardManager> shardManager = actorFactory
1944                 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1945                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1946
1947         shardManager.underlyingActor().waitForRecoveryComplete();
1948
1949         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1950         shardManager.tell(new ActorInitialized(), shard);
1951
1952         waitForShardInitialized(shardManager, "people", kit);
1953         waitForShardInitialized(shardManager, "default", kit);
1954
1955         // Removed the default shard replica from member-1
1956         shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1957
1958         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1959
1960         MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1961
1962         LOG.info("testServerRemovedShardActorRunning ending");
1963     }
1964
1965     @Test
1966     public void testShardPersistenceWithRestoredData() {
1967         LOG.info("testShardPersistenceWithRestoredData starting");
1968         final TestKit kit = new TestKit(getSystem());
1969         MockConfiguration mockConfig =
1970                 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1971                     .put("default", Arrays.asList("member-1", "member-2"))
1972                     .put("astronauts", Arrays.asList("member-2"))
1973                     .put("people", Arrays.asList("member-1", "member-2")).build());
1974         String[] restoredShards = {"default", "astronauts"};
1975         ShardManagerSnapshot snapshot =
1976                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1977         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1978
1979         // create shardManager to come up with restored data
1980         TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1981             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1982
1983         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1984
1985         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1986         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1987         assertEquals("for uninitialized shard", "people", notFound.getShardName());
1988
1989         // Verify a local shard is created for the restored shards,
1990         // although we expect a NotInitializedException for the shards
1991         // as the actor initialization
1992         // message is not sent for them
1993         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1994         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1995
1996         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1997         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1998
1999         LOG.info("testShardPersistenceWithRestoredData ending");
2000     }
2001
2002     @Test
2003     public void testShutDown() throws Exception {
2004         LOG.info("testShutDown starting");
2005         final TestKit kit = new TestKit(getSystem());
2006         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2007             .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2008
2009         String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2010         ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
2011
2012         String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2013         ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
2014
2015         ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
2016             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
2017
2018         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2019         shardManager.tell(new ActorInitialized(), shard1);
2020         shardManager.tell(new ActorInitialized(), shard2);
2021
2022         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2023         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2024
2025         MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2026         MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2027
2028         try {
2029             Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2030             fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2031         } catch (TimeoutException e) {
2032             // expected
2033         }
2034
2035         actorFactory.killActor(shard1, kit);
2036         actorFactory.killActor(shard2, kit);
2037
2038         Boolean stopped = Await.result(stopFuture, duration);
2039         assertEquals("Stopped", Boolean.TRUE, stopped);
2040
2041         LOG.info("testShutDown ending");
2042     }
2043
2044     @Test
2045     public void testChangeServersVotingStatus() {
2046         final TestKit kit = new TestKit(getSystem());
2047         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2048
2049         ActorRef respondActor = actorFactory
2050                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2051                     new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2052
2053         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2054
2055         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2056         shardManager.tell(new ActorInitialized(), respondActor);
2057         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2058             DataStoreVersions.CURRENT_VERSION), kit.getRef());
2059         shardManager.tell(
2060             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2061             respondActor);
2062
2063         shardManager.tell(
2064             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2065
2066         ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2067                 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2068         assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2069             ImmutableMap.of(ShardIdentifier
2070                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2071                 Boolean.TRUE));
2072
2073         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2074     }
2075
2076     @Test
2077     public void testChangeServersVotingStatusWithNoLeader() {
2078         final TestKit kit = new TestKit(getSystem());
2079         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2080
2081         ActorRef respondActor = actorFactory
2082                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2083                     new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2084
2085         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2086
2087         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2088         shardManager.tell(new ActorInitialized(), respondActor);
2089         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2090
2091         shardManager.tell(
2092             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2093
2094         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2095
2096         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2097         assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2098     }
2099
2100     @SuppressWarnings("unchecked")
2101     @Test
2102     public void testRegisterForShardLeaderChanges() {
2103         LOG.info("testRegisterForShardLeaderChanges starting");
2104
2105         final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2106         final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2107         final TestKit kit = new TestKit(getSystem());
2108         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2109
2110         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2111         shardManager.tell(new ActorInitialized(), mockShardActor);
2112
2113         final Consumer<String> mockCallback = mock(Consumer.class);
2114         shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2115
2116         final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2117         final Registration reg = (Registration) reply.status();
2118
2119         final DataTree mockDataTree = mock(DataTree.class);
2120         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2121             DataStoreVersions.CURRENT_VERSION), mockShardActor);
2122
2123         verify(mockCallback, timeout(5000)).accept("default");
2124
2125         reset(mockCallback);
2126         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2127                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2128
2129         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2130         verifyNoMoreInteractions(mockCallback);
2131
2132         shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2133                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2134
2135         verify(mockCallback, timeout(5000)).accept("default");
2136
2137         reset(mockCallback);
2138         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2139                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2140
2141         verify(mockCallback, timeout(5000)).accept("default");
2142
2143         reset(mockCallback);
2144         reg.close();
2145
2146         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2147                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2148
2149         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2150         verifyNoMoreInteractions(mockCallback);
2151
2152         LOG.info("testRegisterForShardLeaderChanges ending");
2153     }
2154
2155     public static class TestShardManager extends ShardManager {
2156         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2157         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2158         private ShardManagerSnapshot snapshot;
2159         private final Map<String, ActorRef> shardActors;
2160         private final ActorRef shardActor;
2161         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2162         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2163         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2164         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2165         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2166         private volatile MessageInterceptor messageInterceptor;
2167
2168         TestShardManager(final Builder builder) {
2169             super(builder);
2170             shardActor = builder.shardActor;
2171             shardActors = builder.shardActors;
2172         }
2173
2174         @Override
2175         protected void handleRecover(final Object message) throws Exception {
2176             try {
2177                 super.handleRecover(message);
2178             } finally {
2179                 if (message instanceof RecoveryCompleted) {
2180                     recoveryComplete.countDown();
2181                 }
2182             }
2183         }
2184
2185         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2186             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2187                 latch.countDown();
2188             }
2189         }
2190
2191         @Override
2192         public void handleCommand(final Object message) throws Exception {
2193             try {
2194                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2195                     getSender().tell(messageInterceptor.apply(message), getSelf());
2196                 } else {
2197                     super.handleCommand(message);
2198                 }
2199             } finally {
2200                 if (message instanceof FindPrimary) {
2201                     findPrimaryMessageReceived.countDown();
2202                 } else if (message instanceof ClusterEvent.MemberUp) {
2203                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2204                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2205                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2206                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2207                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2208                 } else if (message instanceof ClusterEvent.ReachableMember) {
2209                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2210                 }
2211             }
2212         }
2213
2214         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2215             this.messageInterceptor = messageInterceptor;
2216         }
2217
2218         void waitForRecoveryComplete() {
2219             assertTrue("Recovery complete",
2220                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2221         }
2222
2223         public void waitForMemberUp() {
2224             assertTrue("MemberUp received",
2225                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2226             memberUpReceived = new CountDownLatch(1);
2227         }
2228
2229         void waitForMemberRemoved() {
2230             assertTrue("MemberRemoved received",
2231                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2232             memberRemovedReceived = new CountDownLatch(1);
2233         }
2234
2235         void waitForUnreachableMember() {
2236             assertTrue("UnreachableMember received",
2237                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2238             memberUnreachableReceived = new CountDownLatch(1);
2239         }
2240
2241         void waitForReachableMember() {
2242             assertTrue("ReachableMember received",
2243                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2244             memberReachableReceived = new CountDownLatch(1);
2245         }
2246
2247         void verifyFindPrimary() {
2248             assertTrue("FindPrimary received",
2249                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2250             findPrimaryMessageReceived = new CountDownLatch(1);
2251         }
2252
2253         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2254             return new Builder(datastoreContextBuilder);
2255         }
2256
2257         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2258             private ActorRef shardActor;
2259             private final Map<String, ActorRef> shardActors = new HashMap<>();
2260
2261             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2262                 super(TestShardManager.class);
2263                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2264             }
2265
2266             Builder shardActor(final ActorRef newShardActor) {
2267                 shardActor = newShardActor;
2268                 return this;
2269             }
2270
2271             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2272                 shardActors.put(shardName, actorRef);
2273                 return this;
2274             }
2275         }
2276
2277         @Override
2278         public void saveSnapshot(final Object obj) {
2279             snapshot = (ShardManagerSnapshot) obj;
2280             snapshotPersist.countDown();
2281             super.saveSnapshot(obj);
2282         }
2283
2284         void verifySnapshotPersisted(final Set<String> shardList) {
2285             assertTrue("saveSnapshot invoked",
2286                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2287             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2288         }
2289
2290         @Override
2291         protected ActorRef newShardActor(final ShardInformation info) {
2292             if (shardActors.get(info.getShardName()) != null) {
2293                 return shardActors.get(info.getShardName());
2294             }
2295
2296             if (shardActor != null) {
2297                 return shardActor;
2298             }
2299
2300             return super.newShardActor(info);
2301         }
2302     }
2303
2304     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2305                                                      extends AbstractShardManagerCreator<T> {
2306         private final Class<C> shardManagerClass;
2307
2308         AbstractGenericCreator(final Class<C> shardManagerClass) {
2309             this.shardManagerClass = shardManagerClass;
2310             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
2311                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2312         }
2313
2314         @Override
2315         public Props props() {
2316             verify();
2317             return Props.create(shardManagerClass, this);
2318         }
2319     }
2320
2321     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2322         GenericCreator(final Class<C> shardManagerClass) {
2323             super(shardManagerClass);
2324         }
2325     }
2326
2327     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2328         private static final long serialVersionUID = 1L;
2329         private final Creator<ShardManager> delegate;
2330
2331         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2332             this.delegate = delegate;
2333         }
2334
2335         @Override
2336         public ShardManager create() throws Exception {
2337             return delegate.create();
2338         }
2339     }
2340
2341     interface MessageInterceptor extends Function<Object, Object> {
2342         boolean canIntercept(Object message);
2343     }
2344
2345     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2346         return new MessageInterceptor() {
2347             @Override
2348             public Object apply(final Object message) {
2349                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2350             }
2351
2352             @Override
2353             public boolean canIntercept(final Object message) {
2354                 return message instanceof FindPrimary;
2355             }
2356         };
2357     }
2358
2359     private static class MockRespondActor extends MessageCollectorActor {
2360         static final String CLEAR_RESPONSE = "clear-response";
2361
2362         private Object responseMsg;
2363         private final Class<?> requestClass;
2364
2365         @SuppressWarnings("unused")
2366         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2367             this.requestClass = requestClass;
2368             this.responseMsg = responseMsg;
2369         }
2370
2371         @Override
2372         public void onReceive(final Object message) throws Exception {
2373             if (message.equals(CLEAR_RESPONSE)) {
2374                 responseMsg = null;
2375             } else {
2376                 super.onReceive(message);
2377                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2378                     getSender().tell(responseMsg, getSelf());
2379                 }
2380             }
2381         }
2382     }
2383 }