Use SettableFuture<Empty> for readinessFuture
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.ArgumentMatchers.anyString;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSystem;
28 import akka.actor.AddressFromURIString;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Stopwatch;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.Lists;
49 import com.google.common.collect.Sets;
50 import com.google.common.util.concurrent.SettableFuture;
51 import com.google.common.util.concurrent.Uninterruptibles;
52 import java.time.Duration;
53 import java.util.AbstractMap;
54 import java.util.Arrays;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
59 import java.util.Map;
60 import java.util.Map.Entry;
61 import java.util.Set;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import java.util.function.Consumer;
66 import java.util.function.Function;
67 import java.util.stream.Collectors;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.runner.RunWith;
74 import org.mockito.junit.MockitoJUnitRunner;
75 import org.opendaylight.controller.cluster.access.concepts.MemberName;
76 import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest;
77 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
78 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
79 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
80 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
81 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
82 import org.opendaylight.controller.cluster.datastore.Shard;
83 import org.opendaylight.controller.cluster.datastore.config.Configuration;
84 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
85 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
86 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
87 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
88 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
89 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
90 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
91 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
92 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
93 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
94 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
95 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
96 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
97 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
98 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
99 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
100 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
102 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
103 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
104 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
105 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
106 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
107 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
108 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
109 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
110 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
111 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
112 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
113 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
114 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
115 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
116 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
117 import org.opendaylight.controller.cluster.raft.RaftState;
118 import org.opendaylight.controller.cluster.raft.TestActorFactory;
119 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
120 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
121 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
122 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
123 import org.opendaylight.controller.cluster.raft.messages.AddServer;
124 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
125 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
126 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
127 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
128 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
129 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
130 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
131 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
132 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
133 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
134 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
135 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
136 import org.opendaylight.yangtools.concepts.Registration;
137 import org.opendaylight.yangtools.yang.common.Empty;
138 import org.opendaylight.yangtools.yang.common.XMLNamespace;
139 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
140 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
141 import org.slf4j.Logger;
142 import org.slf4j.LoggerFactory;
143 import scala.concurrent.Await;
144 import scala.concurrent.Future;
145 import scala.concurrent.duration.FiniteDuration;
146
147 @RunWith(MockitoJUnitRunner.StrictStubs.class)
148 public class ShardManagerTest extends AbstractClusterRefActorTest {
149     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
150     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
151     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
152     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
153
154     private static int ID_COUNTER = 1;
155     private static ActorRef mockShardActor;
156     private static ShardIdentifier mockShardName;
157     private static SettableFuture<Empty> ready;
158     private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
159
160     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
161     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
162     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
163             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
164             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
165
166     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
167
168     @BeforeClass
169     public static void beforeClass() {
170         TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
171     }
172
173     @AfterClass
174     public static void afterClass() {
175         TEST_SCHEMA_CONTEXT = null;
176     }
177
178     @Before
179     public void setUp() {
180         ready = SettableFuture.create();
181
182         InMemoryJournal.clear();
183         InMemorySnapshotStore.clear();
184
185         if (mockShardActor == null) {
186             mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
187             mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString());
188         }
189
190         MessageCollectorActor.clearMessages(mockShardActor);
191     }
192
193     @After
194     public void tearDown() {
195         InMemoryJournal.clear();
196         InMemorySnapshotStore.clear();
197
198         mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
199         await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated);
200         mockShardActor = null;
201
202         actorFactory.close();
203     }
204
205     private TestShardManager.Builder newTestShardMgrBuilder() {
206         return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
207     }
208
209     private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
210         return TestShardManager.builder(datastoreContextBuilder).configuration(config)
211                 .distributedDataStore(mock(DistributedDataStore.class));
212     }
213
214     private Props newShardMgrProps() {
215         return newShardMgrProps(new MockConfiguration());
216     }
217
218     private Props newShardMgrProps(final Configuration config) {
219         return newTestShardMgrBuilder(config).readinessFuture(ready).props();
220     }
221
222     private ActorSystem newActorSystem(final String config) {
223         return newActorSystem("cluster-test", config);
224     }
225
226     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
227         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
228         if (system == getSystem()) {
229             return actorFactory.createActor(MessageCollectorActor.props(), name);
230         }
231
232         return system.actorOf(MessageCollectorActor.props(), name);
233     }
234
235     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
236         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
237         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
238         doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
239         return mockFactory;
240     }
241
242     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
243         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
244     }
245
246     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
247         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
248                 .distributedDataStore(mock(DistributedDataStore.class));
249     }
250
251
252     private Props newPropsShardMgrWithMockShardActor() {
253         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
254                 Dispatchers.DefaultDispatcherId());
255     }
256
257     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
258         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
259                 .withDispatcher(Dispatchers.DefaultDispatcherId());
260     }
261
262
263     private TestShardManager newTestShardManager() {
264         return newTestShardManager(newShardMgrProps());
265     }
266
267     private TestShardManager newTestShardManager(final Props props) {
268         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
269         TestShardManager shardManager = shardManagerActor.underlyingActor();
270         shardManager.waitForRecoveryComplete();
271         return shardManager;
272     }
273
274     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
275             final TestKit kit) {
276         AssertionError last = null;
277         Stopwatch sw = Stopwatch.createStarted();
278         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
279             try {
280                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
281                 kit.expectMsgClass(LocalShardFound.class);
282                 return;
283             } catch (AssertionError e) {
284                 last = e;
285             }
286
287             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
288         }
289
290         throw last;
291     }
292
293     @SuppressWarnings("unchecked")
294     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
295         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
296         if (reply instanceof Failure) {
297             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
298         }
299
300         return (T)reply;
301     }
302
303     @Test
304     public void testPerShardDatastoreContext() throws Exception {
305         LOG.info("testPerShardDatastoreContext starting");
306         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
307                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
308
309         doReturn(
310                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
311                 .when(mockFactory).getShardDatastoreContext("default");
312
313         doReturn(
314                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
315                 .when(mockFactory).getShardDatastoreContext("topology");
316
317         final MockConfiguration mockConfig = new MockConfiguration() {
318             @Override
319             public Collection<String> getMemberShardNames(final MemberName memberName) {
320                 return Arrays.asList("default", "topology");
321             }
322
323             @Override
324             public Collection<MemberName> getMembersFromShardName(final String shardName) {
325                 return members("member-1");
326             }
327         };
328
329         final ActorRef defaultShardActor = actorFactory.createActor(
330                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
331         final ActorRef topologyShardActor = actorFactory.createActor(
332                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
333
334         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
335                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
336         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
337         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
338
339         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
340         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
341         class LocalShardManager extends ShardManager {
342             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
343                 super(creator);
344             }
345
346             @Override
347             protected ActorRef newShardActor(final ShardInformation info) {
348                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
349                 ActorRef ref = null;
350                 if (entry != null) {
351                     ref = entry.getKey();
352                     entry.setValue(info.getDatastoreContext());
353                 }
354
355                 newShardActorLatch.countDown();
356                 return ref;
357             }
358         }
359
360         final Creator<ShardManager> creator = new Creator<>() {
361             private static final long serialVersionUID = 1L;
362             @Override
363             public ShardManager create() {
364                 return new LocalShardManager(
365                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
366                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
367             }
368         };
369
370         final TestKit kit = new TestKit(getSystem());
371
372         final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
373                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
374
375         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
376
377         assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
378         assertEquals("getShardElectionTimeoutFactor", 6,
379                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
380         assertEquals("getShardElectionTimeoutFactor", 7,
381                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
382
383         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
384                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
385         doReturn(
386                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
387                 .when(newMockFactory).getShardDatastoreContext("default");
388
389         doReturn(
390                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
391                 .when(newMockFactory).getShardDatastoreContext("topology");
392
393         shardManager.tell(newMockFactory, kit.getRef());
394
395         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
396                 DatastoreContext.class);
397         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
398
399         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
400         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
401
402         LOG.info("testPerShardDatastoreContext ending");
403     }
404
405     @Test
406     public void testOnReceiveFindPrimaryForNonExistentShard() {
407         final TestKit kit = new TestKit(getSystem());
408         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
409
410         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
411
412         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
413
414         kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
415     }
416
417     @Test
418     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
419         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
420         final TestKit kit = new TestKit(getSystem());
421         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
422
423         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
424
425         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
426         shardManager.tell(new ActorInitialized(), mockShardActor);
427
428         DataTree mockDataTree = mock(DataTree.class);
429         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
430             DataStoreVersions.CURRENT_VERSION), kit.getRef());
431
432         MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
433         shardManager.tell(
434             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
435             mockShardActor);
436
437         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
438
439         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
440             LocalPrimaryShardFound.class);
441         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
442             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
443         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
444
445         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
446     }
447
448     @Test
449     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
450         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
451         final TestKit kit = new TestKit(getSystem());
452         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
453
454         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
455         shardManager.tell(new ActorInitialized(), mockShardActor);
456
457         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
458         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
459         shardManager.tell(
460             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
461             mockShardActor);
462         shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
463             mockShardActor);
464
465         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
466
467         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
468
469         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
470     }
471
472     @Test
473     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
474         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
475         final TestKit kit = new TestKit(getSystem());
476         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
477
478         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
479         shardManager.tell(new ActorInitialized(), mockShardActor);
480
481         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
482         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
483
484         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
485         shardManager.tell(
486             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
487             mockShardActor);
488         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
489         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
490
491         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
492
493         RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
494         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
495             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
496         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
497
498         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
499     }
500
501     @Test
502     public void testOnReceiveFindPrimaryForUninitializedShard() {
503         final TestKit kit = new TestKit(getSystem());
504         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
505
506         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
507
508         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
509     }
510
511     @Test
512     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
513         final TestKit kit = new TestKit(getSystem());
514         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
515
516         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
517         shardManager.tell(new ActorInitialized(), mockShardActor);
518
519         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
520
521         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
522     }
523
524     @Test
525     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
526         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
527         final TestKit kit = new TestKit(getSystem());
528         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
529
530         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
531         shardManager.tell(new ActorInitialized(), mockShardActor);
532
533         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
534         shardManager.tell(
535             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
536             mockShardActor);
537
538         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
539
540         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
541
542         DataTree mockDataTree = mock(DataTree.class);
543         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
544             DataStoreVersions.CURRENT_VERSION), mockShardActor);
545
546         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
547
548         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
549             LocalPrimaryShardFound.class);
550         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
551             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
552         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
553
554         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
555     }
556
557     @Test
558     public void testOnReceiveFindPrimaryWaitForShardLeader() {
559         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
560         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
561         final TestKit kit = new TestKit(getSystem());
562         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
563
564         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
565
566         // We're passing waitUntilInitialized = true to FindPrimary so
567         // the response should be
568         // delayed until we send ActorInitialized and
569         // RoleChangeNotification.
570         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
571
572         kit.expectNoMessage(Duration.ofMillis(150));
573
574         shardManager.tell(new ActorInitialized(), mockShardActor);
575
576         kit.expectNoMessage(Duration.ofMillis(150));
577
578         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
579         shardManager.tell(
580             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
581             mockShardActor);
582
583         kit.expectNoMessage(Duration.ofMillis(150));
584
585         DataTree mockDataTree = mock(DataTree.class);
586         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
587             DataStoreVersions.CURRENT_VERSION), mockShardActor);
588
589         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
590         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
591             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
592         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
593
594         kit.expectNoMessage(Duration.ofMillis(200));
595
596         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
597     }
598
599     @Test
600     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
601         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
602         final TestKit kit = new TestKit(getSystem());
603         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
604
605         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
606
607         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
608
609         kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
610
611         shardManager.tell(new ActorInitialized(), mockShardActor);
612
613         kit.expectNoMessage(Duration.ofMillis(200));
614
615         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
616     }
617
618     @Test
619     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
620         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
621         final TestKit kit = new TestKit(getSystem());
622         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
623
624         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
625         shardManager.tell(new ActorInitialized(), mockShardActor);
626         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
627             RaftState.Candidate.name()), mockShardActor);
628
629         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
630
631         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
632
633         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
634     }
635
636     @Test
637     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
638         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
639         final TestKit kit = new TestKit(getSystem());
640         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
641
642         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
643         shardManager.tell(new ActorInitialized(), mockShardActor);
644         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
645             RaftState.IsolatedLeader.name()), mockShardActor);
646
647         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
648
649         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
650
651         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
652     }
653
654     @Test
655     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
656         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
657         final TestKit kit = new TestKit(getSystem());
658         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
659
660         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
661         shardManager.tell(new ActorInitialized(), mockShardActor);
662
663         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
664
665         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
666
667         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
668     }
669
670     @Test
671     public void testOnReceiveFindPrimaryForRemoteShard() {
672         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
673         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
674
675         // Create an ActorSystem ShardManager actor for member-1.
676
677         final ActorSystem system1 = newActorSystem("Member1");
678         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
679
680         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
681                 newTestShardMgrBuilderWithMockShardActor().cluster(
682                         new ClusterWrapperImpl(system1)).props().withDispatcher(
683                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
684
685         // Create an ActorSystem ShardManager actor for member-2.
686
687         final ActorSystem system2 = newActorSystem("Member2");
688
689         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
690
691         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
692
693         MockConfiguration mockConfig2 = new MockConfiguration(
694                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
695                         .put("astronauts", Arrays.asList("member-2")).build());
696
697         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
698                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
699                         new ClusterWrapperImpl(system2)).props().withDispatcher(
700                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
701
702         final TestKit kit = new TestKit(system1);
703         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
704         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
705
706         shardManager2.tell(new ActorInitialized(), mockShardActor2);
707
708         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
709         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
710         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
711             mockShardActor2);
712         shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
713             mockShardActor2);
714
715         shardManager1.underlyingActor().waitForMemberUp();
716         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
717
718         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
719         String path = found.getPrimaryPath();
720         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
721         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
722
723         shardManager2.underlyingActor().verifyFindPrimary();
724
725         // This part times out quite a bit on jenkins for some reason
726
727 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
728 //
729 //                shardManager1.underlyingActor().waitForMemberRemoved();
730 //
731 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
732 //
733 //                expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
734
735         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
736     }
737
738     @Test
739     public void testShardAvailabilityOnChangeOfMemberReachability() {
740         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
741         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
742
743         // Create an ActorSystem ShardManager actor for member-1.
744
745         final ActorSystem system1 = newActorSystem("Member1");
746         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
747
748         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
749
750         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
751                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
752                         new ClusterWrapperImpl(system1)).props().withDispatcher(
753                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
754
755         // Create an ActorSystem ShardManager actor for member-2.
756
757         final ActorSystem system2 = newActorSystem("Member2");
758
759         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
760
761         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
762
763         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
764                 .put("default", Arrays.asList("member-1", "member-2")).build());
765
766         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
767                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
768                         new ClusterWrapperImpl(system2)).props().withDispatcher(
769                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
770
771         final TestKit kit = new TestKit(system1);
772         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
773         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
774         shardManager1.tell(new ActorInitialized(), mockShardActor1);
775         shardManager2.tell(new ActorInitialized(), mockShardActor2);
776
777         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
778         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
779         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
780             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
781         shardManager1.tell(
782             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
783             mockShardActor1);
784         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
785             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
786         shardManager2.tell(
787             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
788             mockShardActor2);
789         shardManager1.underlyingActor().waitForMemberUp();
790
791         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
792
793         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
794         String path = found.getPrimaryPath();
795         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
796
797         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
798             kit.getRef());
799
800         shardManager1.underlyingActor().waitForUnreachableMember();
801         MessageCollectorActor.clearMessages(mockShardActor1);
802
803         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
804             kit.getRef());
805
806         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
807
808         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
809
810         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
811             kit.getRef());
812
813         shardManager1.underlyingActor().waitForReachableMember();
814
815         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
816
817         RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
818         String path1 = found1.getPrimaryPath();
819         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
820
821         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
822             kit.getRef());
823
824         // Test FindPrimary wait succeeds after reachable member event.
825
826         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
827                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
828         shardManager1.underlyingActor().waitForUnreachableMember();
829
830         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
831
832         shardManager1.tell(
833             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
834
835         RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
836         String path2 = found2.getPrimaryPath();
837         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
838
839         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
840     }
841
842     @Test
843     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
844         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
845         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
846
847         // Create an ActorSystem ShardManager actor for member-1.
848
849         final ActorSystem system1 = newActorSystem("Member1");
850         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
851
852         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
853
854         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
855         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
856                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
857                         .primaryShardInfoCache(primaryShardInfoCache).props()
858                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
859                 shardManagerID);
860
861         // Create an ActorSystem ShardManager actor for member-2.
862
863         final ActorSystem system2 = newActorSystem("Member2");
864
865         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
866
867         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
868
869         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
870                 .put("default", Arrays.asList("member-1", "member-2")).build());
871
872         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
873                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
874                         new ClusterWrapperImpl(system2)).props().withDispatcher(
875                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
876
877         final TestKit kit = new TestKit(system1);
878         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
879         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
880         shardManager1.tell(new ActorInitialized(), mockShardActor1);
881         shardManager2.tell(new ActorInitialized(), mockShardActor2);
882
883         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
884         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
885         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
886             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
887         shardManager1.tell(
888             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
889             mockShardActor1);
890         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
891             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
892         shardManager2.tell(
893             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
894             mockShardActor2);
895         shardManager1.underlyingActor().waitForMemberUp();
896
897         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
898
899         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
900         String path = found.getPrimaryPath();
901         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
902
903         primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
904             system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
905
906         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
907                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
908
909         shardManager1.underlyingActor().waitForUnreachableMember();
910
911         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
912
913         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
914
915         assertNull("Expected primaryShardInfoCache entry removed",
916             primaryShardInfoCache.getIfPresent("default"));
917
918         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
919             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
920         shardManager1.tell(
921             new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
922             mockShardActor1);
923
924         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
925
926         LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
927         String path1 = found1.getPrimaryPath();
928         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
929
930         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
931     }
932
933     @Test
934     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
935         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
936         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
937
938         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
939                 .put("default", Arrays.asList("member-256", "member-2")).build());
940
941         // Create an ActorSystem, ShardManager and actor for member-256.
942
943         final ActorSystem system256 = newActorSystem("Member256");
944         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
945         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
946
947         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
948
949         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
950
951         // ShardManager must be created with shard configuration to let its localShards has shards.
952         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
953                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
954                         .cluster(new ClusterWrapperImpl(system256))
955                         .primaryShardInfoCache(primaryShardInfoCache).props()
956                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
957                 shardManagerID);
958
959         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
960
961         final ActorSystem system2 = newActorSystem("Member2");
962
963         // Join member-2 into the cluster of member-256.
964         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
965
966         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
967
968         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
969                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
970                         new ClusterWrapperImpl(system2)).props().withDispatcher(
971                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
972
973         final TestKit kit256 = new TestKit(system256);
974         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
975         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
976         shardManager256.tell(new ActorInitialized(), mockShardActor256);
977         shardManager2.tell(new ActorInitialized(), mockShardActor2);
978
979         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
980         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
981         shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
982             DataStoreVersions.CURRENT_VERSION), mockShardActor256);
983         shardManager256.tell(
984             new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
985             mockShardActor256);
986         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
987             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
988         shardManager2.tell(
989             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
990             mockShardActor2);
991         shardManager256.underlyingActor().waitForMemberUp();
992
993         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
994
995         LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
996         String path = found.getPrimaryPath();
997         assertTrue("Unexpected primary path " + path + " which must on member-256",
998             path.contains("member-256-shard-default-config"));
999
1000         PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
1001             system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
1002         primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
1003
1004         // Simulate member-2 become unreachable.
1005         shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
1006                 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
1007         shardManager256.underlyingActor().waitForUnreachableMember();
1008
1009         // Make sure leader shard on member-256 is still leader and still in the cache.
1010         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
1011         found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
1012         path = found.getPrimaryPath();
1013         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
1014             path.contains("member-256-shard-default-config"));
1015         Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
1016         futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
1017             @Override
1018             public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
1019                 if (failure != null) {
1020                     assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
1021                 } else {
1022                     assertEquals("Expected primaryShardInfoCache entry",
1023                         primaryShardInfo, futurePrimaryShardInfo);
1024                 }
1025             }
1026         }, system256.dispatchers().defaultGlobalDispatcher());
1027
1028         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
1029     }
1030
1031     @Test
1032     public void testOnReceiveFindLocalShardForNonExistentShard() {
1033         final TestKit kit = new TestKit(getSystem());
1034         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1035
1036         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1037
1038         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
1039
1040         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1041
1042         assertEquals("getShardName", "non-existent", notFound.getShardName());
1043     }
1044
1045     @Test
1046     public void testOnReceiveFindLocalShardForExistentShard() {
1047         final TestKit kit = new TestKit(getSystem());
1048         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1049
1050         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1051         shardManager.tell(new ActorInitialized(), mockShardActor);
1052
1053         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1054
1055         LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1056
1057         assertTrue("Found path contains " + found.getPath().path().toString(),
1058             found.getPath().path().toString().contains("member-1-shard-default-config"));
1059     }
1060
1061     @Test
1062     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1063         final TestKit kit = new TestKit(getSystem());
1064         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1065
1066         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1067
1068         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1069     }
1070
1071     @Test
1072     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1073         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1074         final TestKit kit = new TestKit(getSystem());
1075         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1076
1077         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1078
1079         // We're passing waitUntilInitialized = true to FindLocalShard
1080         // so the response should be
1081         // delayed until we send ActorInitialized.
1082         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1083             new Timeout(5, TimeUnit.SECONDS));
1084
1085         shardManager.tell(new ActorInitialized(), mockShardActor);
1086
1087         Object resp = Await.result(future, kit.duration("5 seconds"));
1088         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1089
1090         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1091     }
1092
1093     @Test
1094     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1095         TestShardManager shardManager = newTestShardManager();
1096
1097         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1098         shardManager.handleCommand(new RoleChangeNotification(
1099                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1100         assertFalse(ready.isDone());
1101
1102         shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1103                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1104         assertTrue(ready.isDone());
1105     }
1106
1107     @Test
1108     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1109         final TestKit kit = new TestKit(getSystem());
1110         TestShardManager shardManager = newTestShardManager();
1111
1112         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1113         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1114         assertFalse(ready.isDone());
1115
1116         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1117
1118         shardManager.handleCommand(
1119             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1120                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1121         assertTrue(ready.isDone());
1122     }
1123
1124     @Test
1125     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1126         final TestKit kit = new TestKit(getSystem());
1127         TestShardManager shardManager = newTestShardManager();
1128
1129         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1130         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1131         assertFalse(ready.isDone());
1132
1133         shardManager.handleCommand(
1134             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1135                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1136
1137         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1138         assertTrue(ready.isDone());
1139     }
1140
1141     @Test
1142     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1143         TestShardManager shardManager = newTestShardManager();
1144
1145         shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1146             RaftState.Leader.name()));
1147         assertFalse(ready.isDone());
1148     }
1149
1150     @Test
1151     public void testByDefaultSyncStatusIsFalse() {
1152         TestShardManager shardManager = newTestShardManager();
1153
1154         assertFalse(shardManager.getMBean().getSyncStatus());
1155     }
1156
1157     @Test
1158     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1159         TestShardManager shardManager = newTestShardManager();
1160
1161         shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1162                 RaftState.Follower.name(), RaftState.Leader.name()));
1163
1164         assertTrue(shardManager.getMBean().getSyncStatus());
1165     }
1166
1167     @Test
1168     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1169         TestShardManager shardManager = newTestShardManager();
1170
1171         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1172         shardManager.handleCommand(new RoleChangeNotification(shardId,
1173                 RaftState.Follower.name(), RaftState.Candidate.name()));
1174
1175         assertFalse(shardManager.getMBean().getSyncStatus());
1176
1177         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1178         shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1179                 true, shardId));
1180
1181         assertFalse(shardManager.getMBean().getSyncStatus());
1182     }
1183
1184     @Test
1185     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1186         TestShardManager shardManager = newTestShardManager();
1187
1188         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1189         shardManager.handleCommand(new RoleChangeNotification(shardId,
1190                 RaftState.Candidate.name(), RaftState.Follower.name()));
1191
1192         // Initially will be false
1193         assertFalse(shardManager.getMBean().getSyncStatus());
1194
1195         // Send status true will make sync status true
1196         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1197
1198         assertTrue(shardManager.getMBean().getSyncStatus());
1199
1200         // Send status false will make sync status false
1201         shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1202
1203         assertFalse(shardManager.getMBean().getSyncStatus());
1204     }
1205
1206     @Test
1207     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1208         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1209         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1210             @Override
1211             public List<String> getMemberShardNames(final MemberName memberName) {
1212                 return Arrays.asList("default", "astronauts");
1213             }
1214         }));
1215
1216         // Initially will be false
1217         assertFalse(shardManager.getMBean().getSyncStatus());
1218
1219         // Make default shard leader
1220         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1221         shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1222                 RaftState.Follower.name(), RaftState.Leader.name()));
1223
1224         // default = Leader, astronauts is unknown so sync status remains false
1225         assertFalse(shardManager.getMBean().getSyncStatus());
1226
1227         // Make astronauts shard leader as well
1228         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1229         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1230                 RaftState.Follower.name(), RaftState.Leader.name()));
1231
1232         // Now sync status should be true
1233         assertTrue(shardManager.getMBean().getSyncStatus());
1234
1235         // Make astronauts a Follower
1236         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1237                 RaftState.Leader.name(), RaftState.Follower.name()));
1238
1239         // Sync status is not true
1240         assertFalse(shardManager.getMBean().getSyncStatus());
1241
1242         // Make the astronauts follower sync status true
1243         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1244
1245         // Sync status is now true
1246         assertTrue(shardManager.getMBean().getSyncStatus());
1247
1248         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1249     }
1250
1251     @Test
1252     public void testOnReceiveSwitchShardBehavior() {
1253         final TestKit kit = new TestKit(getSystem());
1254         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1255
1256         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1257         shardManager.tell(new ActorInitialized(), mockShardActor);
1258
1259         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1260
1261         SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1262             SwitchBehavior.class);
1263
1264         assertEquals(RaftState.Leader, switchBehavior.getNewState());
1265         assertEquals(1000, switchBehavior.getNewTerm());
1266     }
1267
1268     private static List<MemberName> members(final String... names) {
1269         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1270     }
1271
1272     @Test
1273     public void testOnCreateShard() {
1274         LOG.info("testOnCreateShard starting");
1275         final TestKit kit = new TestKit(getSystem());
1276         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1277
1278         ActorRef shardManager = actorFactory
1279                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1280                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1281
1282         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1283         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1284
1285         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1286                 .persistent(false).build();
1287         Shard.Builder shardBuilder = Shard.builder();
1288
1289         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1290             "foo", null, members("member-1", "member-5", "member-6"));
1291         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1292
1293         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1294
1295         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1296
1297         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1298
1299         assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1300         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1301             .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1302         assertEquals("peerMembers", Sets.newHashSet(
1303             ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1304             ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1305             shardBuilder.getPeerAddresses().keySet());
1306         assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1307             shardBuilder.getId());
1308         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1309
1310         // Send CreateShard with same name - should return Success with
1311         // a message.
1312
1313         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1314
1315         Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1316         assertNotNull("Success status is null", success.status());
1317
1318         LOG.info("testOnCreateShard ending");
1319     }
1320
1321     @Test
1322     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1323         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1324         final TestKit kit = new TestKit(getSystem());
1325         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1326
1327         ActorRef shardManager = actorFactory
1328                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1329                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1330
1331         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1332
1333         Shard.Builder shardBuilder = Shard.builder();
1334         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1335             "foo", null, members("member-5", "member-6"));
1336
1337         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1338         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1339
1340         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1341         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1342
1343         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1344         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1345             .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1346
1347         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1348     }
1349
1350     @Test
1351     public void testOnCreateShardWithNoInitialSchemaContext() {
1352         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1353         final TestKit kit = new TestKit(getSystem());
1354         ActorRef shardManager = actorFactory
1355                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1356                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1357
1358         Shard.Builder shardBuilder = Shard.builder();
1359
1360         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1361             "foo", null, members("member-1"));
1362         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1363
1364         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1365
1366         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1367         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1368
1369         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1370
1371         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1372
1373         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1374         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1375
1376         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1377     }
1378
1379     @Test
1380     public void testGetSnapshot() {
1381         LOG.info("testGetSnapshot starting");
1382         TestKit kit = new TestKit(getSystem());
1383
1384         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1385                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1386                 .put("astronauts", Collections.<String>emptyList()).build());
1387
1388         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1389                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1390
1391         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1392         Failure failure = kit.expectMsgClass(Failure.class);
1393         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1394
1395         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1396
1397         waitForShardInitialized(shardManager, "shard1", kit);
1398         waitForShardInitialized(shardManager, "shard2", kit);
1399
1400         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1401
1402         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1403
1404         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1405         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1406
1407         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1408             datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1409
1410         // Add a new replica
1411
1412         TestKit mockShardLeaderKit = new TestKit(getSystem());
1413
1414         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1415         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1416
1417         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1418         mockShardLeaderKit.expectMsgClass(AddServer.class);
1419         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1420         kit.expectMsgClass(Status.Success.class);
1421         waitForShardInitialized(shardManager, "astronauts", kit);
1422
1423         // Send another GetSnapshot and verify
1424
1425         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1426         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1427
1428         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1429                 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1430
1431         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1432         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1433         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1434                 Sets.newHashSet(snapshot.getShardList()));
1435
1436         LOG.info("testGetSnapshot ending");
1437     }
1438
1439     @Test
1440     public void testRestoreFromSnapshot() {
1441         LOG.info("testRestoreFromSnapshot starting");
1442
1443         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1444
1445         TestKit kit = new TestKit(getSystem());
1446
1447         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1448                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1449                 .put("astronauts", Collections.<String>emptyList()).build());
1450
1451         ShardManagerSnapshot snapshot =
1452                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1453         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1454                 Collections.<ShardSnapshot>emptyList());
1455         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1456                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1457
1458         shardManager.underlyingActor().waitForRecoveryComplete();
1459
1460         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1461
1462         waitForShardInitialized(shardManager, "shard1", kit);
1463         waitForShardInitialized(shardManager, "shard2", kit);
1464         waitForShardInitialized(shardManager, "astronauts", kit);
1465
1466         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1467
1468         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1469
1470         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1471
1472         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1473         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1474                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1475
1476         LOG.info("testRestoreFromSnapshot ending");
1477     }
1478
1479     @Test
1480     public void testAddShardReplicaForNonExistentShardConfig() {
1481         final TestKit kit = new TestKit(getSystem());
1482         ActorRef shardManager = actorFactory
1483                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1484                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1485
1486         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1487         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1488
1489         assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1490     }
1491
1492     @Test
1493     public void testAddShardReplica() {
1494         LOG.info("testAddShardReplica starting");
1495         MockConfiguration mockConfig = new MockConfiguration(
1496                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1497                         .put("astronauts", Arrays.asList("member-2")).build());
1498
1499         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1500         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1501
1502         // Create an ActorSystem ShardManager actor for member-1.
1503         final ActorSystem system1 = newActorSystem("Member1");
1504         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1505         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1506         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1507                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1508                         .cluster(new ClusterWrapperImpl(system1)).props()
1509                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1510                 shardManagerID);
1511
1512         // Create an ActorSystem ShardManager actor for member-2.
1513         final ActorSystem system2 = newActorSystem("Member2");
1514         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1515
1516         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1517         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1518         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1519                 Props.create(MockRespondActor.class, AddServer.class,
1520                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1521                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1522                 name);
1523         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1524                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1525                         .cluster(new ClusterWrapperImpl(system2)).props()
1526                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1527                 shardManagerID);
1528
1529         final TestKit kit = new TestKit(getSystem());
1530         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1531         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1532
1533         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1534
1535         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1536         leaderShardManager.tell(
1537             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1538             mockShardLeaderActor);
1539         leaderShardManager.tell(
1540             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1541             mockShardLeaderActor);
1542
1543         newReplicaShardManager.underlyingActor().waitForMemberUp();
1544         leaderShardManager.underlyingActor().waitForMemberUp();
1545
1546         // Have a dummy snapshot to be overwritten by the new data
1547         // persisted.
1548         String[] restoredShards = { "default", "people" };
1549         ShardManagerSnapshot snapshot =
1550                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1551         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1552         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1553
1554         InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1555         InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1556
1557         // construct a mock response message
1558         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1559         AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1560             AddServer.class);
1561         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1562         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1563         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1564
1565         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1566         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1567         List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1568             ShardManagerSnapshot.class);
1569         assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1570         ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1571         assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1572             Sets.newHashSet(shardManagerSnapshot.getShardList()));
1573         LOG.info("testAddShardReplica ending");
1574     }
1575
1576     @Test
1577     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1578         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1579         final TestKit kit = new TestKit(getSystem());
1580         TestActorRef<TestShardManager> shardManager = actorFactory
1581                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1582
1583         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1584         shardManager.tell(new ActorInitialized(), mockShardActor);
1585
1586         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1587         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1588         ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1589                 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1590
1591         MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1592
1593         String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1594         shardManager.tell(
1595             new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1596             mockShardActor);
1597         shardManager.tell(
1598             new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1599             mockShardActor);
1600
1601         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1602
1603         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1604
1605         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1606         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1607
1608         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1609         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1610
1611         // Send message again to verify previous in progress state is
1612         // cleared
1613
1614         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1615         resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1616         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1617
1618         // Send message again with an AddServer timeout to verify the
1619         // pre-existing shard actor isn't terminated.
1620
1621         shardManager.tell(
1622             newDatastoreContextFactory(
1623                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1624         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1625         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1626         kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1627
1628         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1629         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1630
1631         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1632     }
1633
1634     @Test
1635     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1636         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1637         final TestKit kit = new TestKit(getSystem());
1638         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1639         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1640
1641         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1642         shardManager.tell(new ActorInitialized(), mockShardActor);
1643         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1644             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1645         shardManager.tell(
1646             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1647             mockShardActor);
1648
1649         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1650         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1651         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1652
1653         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1654         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1655
1656         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1657     }
1658
1659     @Test
1660     public void testAddShardReplicaWithAddServerReplyFailure() {
1661         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1662         final TestKit kit = new TestKit(getSystem());
1663         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1664
1665         MockConfiguration mockConfig = new MockConfiguration(
1666             ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1667
1668         ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1669         final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1670             newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1671             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1672         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1673
1674         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1675
1676         TestKit terminateWatcher = new TestKit(getSystem());
1677         terminateWatcher.watch(mockNewReplicaShardActor);
1678
1679         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1680
1681         AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1682         assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1683             addServerMsg.getNewServerId());
1684         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1685
1686         Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1687         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1688
1689         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1690         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1691
1692         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1693
1694         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1695         mockShardLeaderKit.expectMsgClass(AddServer.class);
1696         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1697         failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1698         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1699
1700         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1701     }
1702
1703     @Test
1704     public void testAddShardReplicaWithAlreadyInProgress() {
1705         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1706                 AddServer.class, new AddShardReplica("astronauts"));
1707     }
1708
1709     @Test
1710     public void testAddShardReplicaWithFindPrimaryTimeout() {
1711         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1712         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1713         final TestKit kit = new TestKit(getSystem());
1714         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1715
1716         final ActorRef newReplicaShardManager = actorFactory
1717                 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1718                     .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1719
1720         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1721         MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1722             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1723
1724         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1725         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1726         assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1727
1728         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1729     }
1730
1731     @Test
1732     public void testRemoveShardReplicaForNonExistentShard() {
1733         final TestKit kit = new TestKit(getSystem());
1734         ActorRef shardManager = actorFactory
1735                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1736                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1737
1738         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1739         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1740         assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1741     }
1742
1743     @Test
1744     /**
1745      * Primary is Local.
1746      */
1747     public void testRemoveShardReplicaLocal() {
1748         final TestKit kit = new TestKit(getSystem());
1749         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1750
1751         final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1752             RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1753
1754         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1755
1756         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1757         shardManager.tell(new ActorInitialized(), respondActor);
1758         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1759             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1760         shardManager.tell(
1761             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1762             respondActor);
1763
1764         shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1765         final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1766             RemoveServer.class);
1767         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1768             removeServer.getServerId());
1769         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1770     }
1771
1772     @Test
1773     public void testRemoveShardReplicaRemote() {
1774         MockConfiguration mockConfig = new MockConfiguration(
1775                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1776                         .put("astronauts", Arrays.asList("member-1")).build());
1777
1778         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1779
1780         // Create an ActorSystem ShardManager actor for member-1.
1781         final ActorSystem system1 = newActorSystem("Member1");
1782         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1783         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1784
1785         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1786                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1787                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1788                 shardManagerID);
1789
1790         // Create an ActorSystem ShardManager actor for member-2.
1791         final ActorSystem system2 = newActorSystem("Member2");
1792         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1793
1794         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1795         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1796         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1797                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1798                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1799
1800         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1801
1802         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1803                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1804                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1805                 shardManagerID);
1806
1807         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1808         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1809         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1810         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1811         // look like so,
1812         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1813         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1814         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1815         // dead letters.
1816         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1817         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1818         // thing works as expected
1819         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1820                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1821                         "member-2-shard-default-" + shardMrgIDSuffix);
1822
1823         LOG.error("Forwarding actor : {}", actorRef);
1824
1825         final TestKit kit = new TestKit(getSystem());
1826         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1827         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1828
1829         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1830         newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1831
1832         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1833         leaderShardManager.tell(
1834             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1835             mockShardLeaderActor);
1836         leaderShardManager.tell(
1837             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1838             mockShardLeaderActor);
1839
1840         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1841         newReplicaShardManager.tell(
1842             new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1843             mockShardActor);
1844         newReplicaShardManager.tell(
1845             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1846             mockShardActor);
1847
1848         newReplicaShardManager.underlyingActor().waitForMemberUp();
1849         leaderShardManager.underlyingActor().waitForMemberUp();
1850
1851         // construct a mock response message
1852         newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1853         RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1854             RemoveServer.class);
1855         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1856         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1857         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1858     }
1859
1860     @Test
1861     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1862         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1863                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1864     }
1865
1866     @Test
1867     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1868         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1869                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1870     }
1871
1872
1873     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1874                                                       final Class<?> firstForwardedServerChangeClass,
1875                                                       final Object secondServerChange) {
1876         final TestKit kit = new TestKit(getSystem());
1877         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1878         final TestKit secondRequestKit = new TestKit(getSystem());
1879
1880         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1881             .put(shardName, Arrays.asList("member-2")).build());
1882
1883         final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1884             newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1885             .cluster(new MockClusterWrapper()).props()
1886             .withDispatcher(Dispatchers.DefaultDispatcherId()),
1887             shardMgrID);
1888
1889         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1890
1891         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1892
1893         shardManager.tell(firstServerChange, kit.getRef());
1894
1895         mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1896
1897         shardManager.tell(secondServerChange, secondRequestKit.getRef());
1898
1899         secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1900     }
1901
1902     @Test
1903     public void testServerRemovedShardActorNotRunning() {
1904         LOG.info("testServerRemovedShardActorNotRunning starting");
1905         final TestKit kit = new TestKit(getSystem());
1906         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1907             .put("default", Arrays.asList("member-1", "member-2"))
1908             .put("astronauts", Arrays.asList("member-2"))
1909             .put("people", Arrays.asList("member-1", "member-2")).build());
1910
1911         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1912             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1913
1914         shardManager.underlyingActor().waitForRecoveryComplete();
1915         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1916         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1917
1918         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1919         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1920
1921         // Removed the default shard replica from member-1
1922         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1923         ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1924                 .build();
1925         shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1926
1927         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1928
1929         LOG.info("testServerRemovedShardActorNotRunning ending");
1930     }
1931
1932     @Test
1933     public void testServerRemovedShardActorRunning() {
1934         LOG.info("testServerRemovedShardActorRunning starting");
1935         final TestKit kit = new TestKit(getSystem());
1936         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1937             .put("default", Arrays.asList("member-1", "member-2"))
1938             .put("astronauts", Arrays.asList("member-2"))
1939             .put("people", Arrays.asList("member-1", "member-2")).build());
1940
1941         String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1942         ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1943
1944         TestActorRef<TestShardManager> shardManager = actorFactory
1945                 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1946                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1947
1948         shardManager.underlyingActor().waitForRecoveryComplete();
1949
1950         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1951         shardManager.tell(new ActorInitialized(), shard);
1952
1953         waitForShardInitialized(shardManager, "people", kit);
1954         waitForShardInitialized(shardManager, "default", kit);
1955
1956         // Removed the default shard replica from member-1
1957         shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1958
1959         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1960
1961         MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1962
1963         LOG.info("testServerRemovedShardActorRunning ending");
1964     }
1965
1966     @Test
1967     public void testShardPersistenceWithRestoredData() {
1968         LOG.info("testShardPersistenceWithRestoredData starting");
1969         final TestKit kit = new TestKit(getSystem());
1970         MockConfiguration mockConfig =
1971                 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1972                     .put("default", Arrays.asList("member-1", "member-2"))
1973                     .put("astronauts", Arrays.asList("member-2"))
1974                     .put("people", Arrays.asList("member-1", "member-2")).build());
1975         String[] restoredShards = {"default", "astronauts"};
1976         ShardManagerSnapshot snapshot =
1977                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1978         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1979
1980         // create shardManager to come up with restored data
1981         TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1982             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1983
1984         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1985
1986         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1987         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1988         assertEquals("for uninitialized shard", "people", notFound.getShardName());
1989
1990         // Verify a local shard is created for the restored shards,
1991         // although we expect a NotInitializedException for the shards
1992         // as the actor initialization
1993         // message is not sent for them
1994         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1995         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1996
1997         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1998         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1999
2000         LOG.info("testShardPersistenceWithRestoredData ending");
2001     }
2002
2003     @Test
2004     public void testShutDown() throws Exception {
2005         LOG.info("testShutDown starting");
2006         final TestKit kit = new TestKit(getSystem());
2007         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2008             .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2009
2010         String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2011         ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
2012
2013         String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2014         ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
2015
2016         ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
2017             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
2018
2019         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2020         shardManager.tell(new ActorInitialized(), shard1);
2021         shardManager.tell(new ActorInitialized(), shard2);
2022
2023         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2024         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2025
2026         MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2027         MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2028
2029         try {
2030             Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2031             fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2032         } catch (TimeoutException e) {
2033             // expected
2034         }
2035
2036         actorFactory.killActor(shard1, kit);
2037         actorFactory.killActor(shard2, kit);
2038
2039         Boolean stopped = Await.result(stopFuture, duration);
2040         assertEquals("Stopped", Boolean.TRUE, stopped);
2041
2042         LOG.info("testShutDown ending");
2043     }
2044
2045     @Test
2046     public void testChangeServersVotingStatus() {
2047         final TestKit kit = new TestKit(getSystem());
2048         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2049
2050         ActorRef respondActor = actorFactory
2051                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2052                     new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2053
2054         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2055
2056         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2057         shardManager.tell(new ActorInitialized(), respondActor);
2058         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2059             DataStoreVersions.CURRENT_VERSION), kit.getRef());
2060         shardManager.tell(
2061             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2062             respondActor);
2063
2064         shardManager.tell(
2065             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2066
2067         ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2068                 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2069         assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2070             ImmutableMap.of(ShardIdentifier
2071                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2072                 Boolean.TRUE));
2073
2074         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2075     }
2076
2077     @Test
2078     public void testChangeServersVotingStatusWithNoLeader() {
2079         final TestKit kit = new TestKit(getSystem());
2080         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2081
2082         ActorRef respondActor = actorFactory
2083                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2084                     new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2085
2086         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2087
2088         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2089         shardManager.tell(new ActorInitialized(), respondActor);
2090         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2091
2092         shardManager.tell(
2093             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2094
2095         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2096
2097         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2098         assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2099     }
2100
2101     @SuppressWarnings("unchecked")
2102     @Test
2103     public void testRegisterForShardLeaderChanges() {
2104         LOG.info("testRegisterForShardLeaderChanges starting");
2105
2106         final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2107         final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2108         final TestKit kit = new TestKit(getSystem());
2109         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2110
2111         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2112         shardManager.tell(new ActorInitialized(), mockShardActor);
2113
2114         final Consumer<String> mockCallback = mock(Consumer.class);
2115         shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2116
2117         final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2118         final Registration reg = (Registration) reply.status();
2119
2120         final DataTree mockDataTree = mock(DataTree.class);
2121         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2122             DataStoreVersions.CURRENT_VERSION), mockShardActor);
2123
2124         verify(mockCallback, timeout(5000)).accept("default");
2125
2126         reset(mockCallback);
2127         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2128                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2129
2130         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2131         verifyNoMoreInteractions(mockCallback);
2132
2133         shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2134                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2135
2136         verify(mockCallback, timeout(5000)).accept("default");
2137
2138         reset(mockCallback);
2139         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2140                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2141
2142         verify(mockCallback, timeout(5000)).accept("default");
2143
2144         reset(mockCallback);
2145         reg.close();
2146
2147         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2148                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2149
2150         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2151         verifyNoMoreInteractions(mockCallback);
2152
2153         LOG.info("testRegisterForShardLeaderChanges ending");
2154     }
2155
2156     public static class TestShardManager extends ShardManager {
2157         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2158         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2159         private ShardManagerSnapshot snapshot;
2160         private final Map<String, ActorRef> shardActors;
2161         private final ActorRef shardActor;
2162         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2163         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2164         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2165         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2166         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2167         private volatile MessageInterceptor messageInterceptor;
2168
2169         TestShardManager(final Builder builder) {
2170             super(builder);
2171             shardActor = builder.shardActor;
2172             shardActors = builder.shardActors;
2173         }
2174
2175         @Override
2176         protected void handleRecover(final Object message) throws Exception {
2177             try {
2178                 super.handleRecover(message);
2179             } finally {
2180                 if (message instanceof RecoveryCompleted) {
2181                     recoveryComplete.countDown();
2182                 }
2183             }
2184         }
2185
2186         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2187             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2188                 latch.countDown();
2189             }
2190         }
2191
2192         @Override
2193         public void handleCommand(final Object message) throws Exception {
2194             try {
2195                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2196                     getSender().tell(messageInterceptor.apply(message), getSelf());
2197                 } else {
2198                     super.handleCommand(message);
2199                 }
2200             } finally {
2201                 if (message instanceof FindPrimary) {
2202                     findPrimaryMessageReceived.countDown();
2203                 } else if (message instanceof ClusterEvent.MemberUp) {
2204                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2205                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2206                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2207                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2208                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2209                 } else if (message instanceof ClusterEvent.ReachableMember) {
2210                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2211                 }
2212             }
2213         }
2214
2215         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2216             this.messageInterceptor = messageInterceptor;
2217         }
2218
2219         void waitForRecoveryComplete() {
2220             assertTrue("Recovery complete",
2221                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2222         }
2223
2224         public void waitForMemberUp() {
2225             assertTrue("MemberUp received",
2226                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2227             memberUpReceived = new CountDownLatch(1);
2228         }
2229
2230         void waitForMemberRemoved() {
2231             assertTrue("MemberRemoved received",
2232                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2233             memberRemovedReceived = new CountDownLatch(1);
2234         }
2235
2236         void waitForUnreachableMember() {
2237             assertTrue("UnreachableMember received",
2238                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2239             memberUnreachableReceived = new CountDownLatch(1);
2240         }
2241
2242         void waitForReachableMember() {
2243             assertTrue("ReachableMember received",
2244                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2245             memberReachableReceived = new CountDownLatch(1);
2246         }
2247
2248         void verifyFindPrimary() {
2249             assertTrue("FindPrimary received",
2250                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2251             findPrimaryMessageReceived = new CountDownLatch(1);
2252         }
2253
2254         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2255             return new Builder(datastoreContextBuilder);
2256         }
2257
2258         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2259             private ActorRef shardActor;
2260             private final Map<String, ActorRef> shardActors = new HashMap<>();
2261
2262             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2263                 super(TestShardManager.class);
2264                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2265             }
2266
2267             Builder shardActor(final ActorRef newShardActor) {
2268                 shardActor = newShardActor;
2269                 return this;
2270             }
2271
2272             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2273                 shardActors.put(shardName, actorRef);
2274                 return this;
2275             }
2276         }
2277
2278         @Override
2279         public void saveSnapshot(final Object obj) {
2280             snapshot = (ShardManagerSnapshot) obj;
2281             snapshotPersist.countDown();
2282             super.saveSnapshot(obj);
2283         }
2284
2285         void verifySnapshotPersisted(final Set<String> shardList) {
2286             assertTrue("saveSnapshot invoked",
2287                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2288             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2289         }
2290
2291         @Override
2292         protected ActorRef newShardActor(final ShardInformation info) {
2293             if (shardActors.get(info.getShardName()) != null) {
2294                 return shardActors.get(info.getShardName());
2295             }
2296
2297             if (shardActor != null) {
2298                 return shardActor;
2299             }
2300
2301             return super.newShardActor(info);
2302         }
2303     }
2304
2305     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2306                                                      extends AbstractShardManagerCreator<T> {
2307         private final Class<C> shardManagerClass;
2308
2309         AbstractGenericCreator(final Class<C> shardManagerClass) {
2310             this.shardManagerClass = shardManagerClass;
2311             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
2312                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2313         }
2314
2315         @Override
2316         public Props props() {
2317             verify();
2318             return Props.create(shardManagerClass, this);
2319         }
2320     }
2321
2322     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2323         GenericCreator(final Class<C> shardManagerClass) {
2324             super(shardManagerClass);
2325         }
2326     }
2327
2328     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2329         private static final long serialVersionUID = 1L;
2330         private final Creator<ShardManager> delegate;
2331
2332         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2333             this.delegate = delegate;
2334         }
2335
2336         @Override
2337         public ShardManager create() throws Exception {
2338             return delegate.create();
2339         }
2340     }
2341
2342     interface MessageInterceptor extends Function<Object, Object> {
2343         boolean canIntercept(Object message);
2344     }
2345
2346     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2347         return new MessageInterceptor() {
2348             @Override
2349             public Object apply(final Object message) {
2350                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2351             }
2352
2353             @Override
2354             public boolean canIntercept(final Object message) {
2355                 return message instanceof FindPrimary;
2356             }
2357         };
2358     }
2359
2360     private static class MockRespondActor extends MessageCollectorActor {
2361         static final String CLEAR_RESPONSE = "clear-response";
2362
2363         private Object responseMsg;
2364         private final Class<?> requestClass;
2365
2366         @SuppressWarnings("unused")
2367         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2368             this.requestClass = requestClass;
2369             this.responseMsg = responseMsg;
2370         }
2371
2372         @Override
2373         public void onReceive(final Object message) throws Exception {
2374             if (message.equals(CLEAR_RESPONSE)) {
2375                 responseMsg = null;
2376             } else {
2377                 super.onReceive(message);
2378                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2379                     getSender().tell(responseMsg, getSelf());
2380                 }
2381             }
2382         }
2383     }
2384 }