Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.ArgumentMatchers.anyString;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSystem;
28 import akka.actor.AddressFromURIString;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Stopwatch;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.Lists;
49 import com.google.common.collect.Sets;
50 import com.google.common.util.concurrent.SettableFuture;
51 import com.google.common.util.concurrent.Uninterruptibles;
52 import java.time.Duration;
53 import java.util.AbstractMap;
54 import java.util.Arrays;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
59 import java.util.Map;
60 import java.util.Map.Entry;
61 import java.util.Set;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import java.util.function.Consumer;
66 import java.util.function.Function;
67 import java.util.stream.Collectors;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.runner.RunWith;
74 import org.mockito.junit.MockitoJUnitRunner;
75 import org.opendaylight.controller.cluster.access.concepts.MemberName;
76 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
77 import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest;
78 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
79 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
80 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
81 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
82 import org.opendaylight.controller.cluster.datastore.Shard;
83 import org.opendaylight.controller.cluster.datastore.config.Configuration;
84 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
85 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
86 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
87 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
88 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
89 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
90 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
91 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
92 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
93 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
94 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
95 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
96 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
97 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
98 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
99 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
100 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
102 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
103 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
104 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
105 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
106 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
107 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
108 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
109 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
110 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
111 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
112 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
113 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
114 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
115 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
116 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
117 import org.opendaylight.controller.cluster.raft.RaftState;
118 import org.opendaylight.controller.cluster.raft.TestActorFactory;
119 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
120 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
121 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
122 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
123 import org.opendaylight.controller.cluster.raft.messages.AddServer;
124 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
125 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
126 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
127 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
128 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
129 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
130 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
131 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
132 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
133 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
134 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
135 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
136 import org.opendaylight.yangtools.concepts.Registration;
137 import org.opendaylight.yangtools.yang.common.Empty;
138 import org.opendaylight.yangtools.yang.common.XMLNamespace;
139 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
140 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
141 import org.slf4j.Logger;
142 import org.slf4j.LoggerFactory;
143 import scala.concurrent.Await;
144 import scala.concurrent.Future;
145 import scala.concurrent.duration.FiniteDuration;
146
147 @RunWith(MockitoJUnitRunner.StrictStubs.class)
148 public class ShardManagerTest extends AbstractClusterRefActorTest {
149     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
150     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
151     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
152     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
153
154     private static int ID_COUNTER = 1;
155     private static ActorRef mockShardActor;
156     private static ShardIdentifier mockShardName;
157     private static SettableFuture<Empty> ready;
158     private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
159
160     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
161     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
162     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
163             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
164             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
165
166     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
167
168     @BeforeClass
169     public static void beforeClass() {
170         TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
171     }
172
173     @AfterClass
174     public static void afterClass() {
175         TEST_SCHEMA_CONTEXT = null;
176     }
177
178     @Before
179     public void setUp() {
180         ready = SettableFuture.create();
181
182         InMemoryJournal.clear();
183         InMemorySnapshotStore.clear();
184
185         if (mockShardActor == null) {
186             mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
187             mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString());
188         }
189
190         MessageCollectorActor.clearMessages(mockShardActor);
191     }
192
193     @After
194     public void tearDown() {
195         InMemoryJournal.clear();
196         InMemorySnapshotStore.clear();
197
198         mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
199         await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated);
200         mockShardActor = null;
201
202         actorFactory.close();
203     }
204
205     private TestShardManager.Builder newTestShardMgrBuilder() {
206         return TestShardManager.builder(datastoreContextBuilder)
207             .distributedDataStore(mock(ClientBackedDataStore.class));
208     }
209
210     private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
211         return newTestShardMgrBuilder().configuration(config);
212     }
213
214     private Props newShardMgrProps() {
215         return newShardMgrProps(new MockConfiguration());
216     }
217
218     private Props newShardMgrProps(final Configuration config) {
219         return newTestShardMgrBuilder(config).readinessFuture(ready).props();
220     }
221
222     private ActorSystem newActorSystem(final String config) {
223         return newActorSystem("cluster-test", config);
224     }
225
226     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
227         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
228         if (system == getSystem()) {
229             return actorFactory.createActor(MessageCollectorActor.props(), name);
230         }
231
232         return system.actorOf(MessageCollectorActor.props(), name);
233     }
234
235     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
236         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
237         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
238         doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
239         return mockFactory;
240     }
241
242     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
243         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
244     }
245
246     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
247         return TestShardManager.builder(datastoreContextBuilder)
248             .shardActor(shardActor)
249             .distributedDataStore(mock(ClientBackedDataStore.class));
250     }
251
252
253     private Props newPropsShardMgrWithMockShardActor() {
254         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
255                 Dispatchers.DefaultDispatcherId());
256     }
257
258     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
259         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
260                 .withDispatcher(Dispatchers.DefaultDispatcherId());
261     }
262
263
264     private TestShardManager newTestShardManager() {
265         return newTestShardManager(newShardMgrProps());
266     }
267
268     private TestShardManager newTestShardManager(final Props props) {
269         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
270         TestShardManager shardManager = shardManagerActor.underlyingActor();
271         shardManager.waitForRecoveryComplete();
272         return shardManager;
273     }
274
275     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
276             final TestKit kit) {
277         AssertionError last = null;
278         Stopwatch sw = Stopwatch.createStarted();
279         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
280             try {
281                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
282                 kit.expectMsgClass(LocalShardFound.class);
283                 return;
284             } catch (AssertionError e) {
285                 last = e;
286             }
287
288             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
289         }
290
291         throw last;
292     }
293
294     @SuppressWarnings("unchecked")
295     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
296         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
297         if (reply instanceof Failure) {
298             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
299         }
300
301         return (T)reply;
302     }
303
304     @Test
305     public void testPerShardDatastoreContext() throws Exception {
306         LOG.info("testPerShardDatastoreContext starting");
307         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
308                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
309
310         doReturn(
311                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
312                 .when(mockFactory).getShardDatastoreContext("default");
313
314         doReturn(
315                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
316                 .when(mockFactory).getShardDatastoreContext("topology");
317
318         final MockConfiguration mockConfig = new MockConfiguration() {
319             @Override
320             public Collection<String> getMemberShardNames(final MemberName memberName) {
321                 return Arrays.asList("default", "topology");
322             }
323
324             @Override
325             public Collection<MemberName> getMembersFromShardName(final String shardName) {
326                 return members("member-1");
327             }
328         };
329
330         final ActorRef defaultShardActor = actorFactory.createActor(
331                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
332         final ActorRef topologyShardActor = actorFactory.createActor(
333                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
334
335         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
336                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
337         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
338         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
339
340         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
341         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
342         class LocalShardManager extends ShardManager {
343             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
344                 super(creator);
345             }
346
347             @Override
348             protected ActorRef newShardActor(final ShardInformation info) {
349                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
350                 ActorRef ref = null;
351                 if (entry != null) {
352                     ref = entry.getKey();
353                     entry.setValue(info.getDatastoreContext());
354                 }
355
356                 newShardActorLatch.countDown();
357                 return ref;
358             }
359         }
360
361         final Creator<ShardManager> creator = new Creator<>() {
362             private static final long serialVersionUID = 1L;
363             @Override
364             public ShardManager create() {
365                 return new LocalShardManager(
366                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
367                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
368             }
369         };
370
371         final TestKit kit = new TestKit(getSystem());
372
373         final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
374                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
375
376         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
377
378         assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
379         assertEquals("getShardElectionTimeoutFactor", 6,
380                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
381         assertEquals("getShardElectionTimeoutFactor", 7,
382                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
383
384         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
385                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
386         doReturn(
387                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
388                 .when(newMockFactory).getShardDatastoreContext("default");
389
390         doReturn(
391                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
392                 .when(newMockFactory).getShardDatastoreContext("topology");
393
394         shardManager.tell(newMockFactory, kit.getRef());
395
396         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
397                 DatastoreContext.class);
398         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
399
400         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
401         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
402
403         LOG.info("testPerShardDatastoreContext ending");
404     }
405
406     @Test
407     public void testOnReceiveFindPrimaryForNonExistentShard() {
408         final TestKit kit = new TestKit(getSystem());
409         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
410
411         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
412
413         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
414
415         kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
416     }
417
418     @Test
419     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
420         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
421         final TestKit kit = new TestKit(getSystem());
422         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
423
424         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
425
426         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
427         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
428
429         DataTree mockDataTree = mock(DataTree.class);
430         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
431             DataStoreVersions.CURRENT_VERSION), kit.getRef());
432
433         MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
434         shardManager.tell(
435             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
436             mockShardActor);
437
438         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
439
440         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
441             LocalPrimaryShardFound.class);
442         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
443             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
444         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
445
446         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
447     }
448
449     @Test
450     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
451         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
452         final TestKit kit = new TestKit(getSystem());
453         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
454
455         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
456         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
457
458         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
459         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
460         shardManager.tell(
461             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
462             mockShardActor);
463         shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
464             mockShardActor);
465
466         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
467
468         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
469
470         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
471     }
472
473     @Test
474     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
475         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
476         final TestKit kit = new TestKit(getSystem());
477         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
478
479         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
480         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
481
482         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
483         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
484
485         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
486         shardManager.tell(
487             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
488             mockShardActor);
489         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
490         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
491
492         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
493
494         RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
495         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
496             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
497         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
498
499         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
500     }
501
502     @Test
503     public void testOnReceiveFindPrimaryForUninitializedShard() {
504         final TestKit kit = new TestKit(getSystem());
505         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
506
507         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
508
509         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
510     }
511
512     @Test
513     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
514         final TestKit kit = new TestKit(getSystem());
515         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
516
517         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
518         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
519
520         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
521
522         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
523     }
524
525     @Test
526     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
527         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
528         final TestKit kit = new TestKit(getSystem());
529         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
530
531         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
532         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
533
534         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
535         shardManager.tell(
536             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
537             mockShardActor);
538
539         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
540
541         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
542
543         DataTree mockDataTree = mock(DataTree.class);
544         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
545             DataStoreVersions.CURRENT_VERSION), mockShardActor);
546
547         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
548
549         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
550             LocalPrimaryShardFound.class);
551         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
552             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
553         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
554
555         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
556     }
557
558     @Test
559     public void testOnReceiveFindPrimaryWaitForShardLeader() {
560         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
561         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
562         final TestKit kit = new TestKit(getSystem());
563         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
564
565         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
566
567         // We're passing waitUntilInitialized = true to FindPrimary so
568         // the response should be
569         // delayed until we send ActorInitialized and
570         // RoleChangeNotification.
571         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
572
573         kit.expectNoMessage(Duration.ofMillis(150));
574
575         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
576
577         kit.expectNoMessage(Duration.ofMillis(150));
578
579         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
580         shardManager.tell(
581             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
582             mockShardActor);
583
584         kit.expectNoMessage(Duration.ofMillis(150));
585
586         DataTree mockDataTree = mock(DataTree.class);
587         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
588             DataStoreVersions.CURRENT_VERSION), mockShardActor);
589
590         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
591         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
592             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
593         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
594
595         kit.expectNoMessage(Duration.ofMillis(200));
596
597         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
598     }
599
600     @Test
601     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
602         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
603         final TestKit kit = new TestKit(getSystem());
604         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
605
606         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
607
608         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
609
610         kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
611
612         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
613
614         kit.expectNoMessage(Duration.ofMillis(200));
615
616         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
617     }
618
619     @Test
620     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
621         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
622         final TestKit kit = new TestKit(getSystem());
623         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
624
625         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
626         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
627         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
628             RaftState.Candidate.name()), mockShardActor);
629
630         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
631
632         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
633
634         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
635     }
636
637     @Test
638     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
639         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
640         final TestKit kit = new TestKit(getSystem());
641         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
642
643         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
644         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
645         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
646             RaftState.IsolatedLeader.name()), mockShardActor);
647
648         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
649
650         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
651
652         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
653     }
654
655     @Test
656     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
657         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
658         final TestKit kit = new TestKit(getSystem());
659         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
660
661         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
662         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
663
664         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
665
666         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
667
668         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
669     }
670
671     @Test
672     public void testOnReceiveFindPrimaryForRemoteShard() {
673         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
674         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
675
676         // Create an ActorSystem ShardManager actor for member-1.
677
678         final ActorSystem system1 = newActorSystem("Member1");
679         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
680
681         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
682                 newTestShardMgrBuilderWithMockShardActor().cluster(
683                         new ClusterWrapperImpl(system1)).props().withDispatcher(
684                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
685
686         // Create an ActorSystem ShardManager actor for member-2.
687
688         final ActorSystem system2 = newActorSystem("Member2");
689
690         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
691
692         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
693
694         MockConfiguration mockConfig2 = new MockConfiguration(
695                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
696                         .put("astronauts", Arrays.asList("member-2")).build());
697
698         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
699                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
700                         new ClusterWrapperImpl(system2)).props().withDispatcher(
701                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
702
703         final TestKit kit = new TestKit(system1);
704         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
705         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
706
707         shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
708
709         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
710         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
711         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
712             mockShardActor2);
713         shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
714             mockShardActor2);
715
716         shardManager1.underlyingActor().waitForMemberUp();
717         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
718
719         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
720         String path = found.getPrimaryPath();
721         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
722         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
723
724         shardManager2.underlyingActor().verifyFindPrimary();
725
726         // This part times out quite a bit on jenkins for some reason
727
728 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
729 //
730 //                shardManager1.underlyingActor().waitForMemberRemoved();
731 //
732 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
733 //
734 //                expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
735
736         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
737     }
738
739     @Test
740     public void testShardAvailabilityOnChangeOfMemberReachability() {
741         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
742         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
743
744         // Create an ActorSystem ShardManager actor for member-1.
745
746         final ActorSystem system1 = newActorSystem("Member1");
747         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
748
749         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
750
751         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
752                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
753                         new ClusterWrapperImpl(system1)).props().withDispatcher(
754                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
755
756         // Create an ActorSystem ShardManager actor for member-2.
757
758         final ActorSystem system2 = newActorSystem("Member2");
759
760         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
761
762         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
763
764         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
765                 .put("default", Arrays.asList("member-1", "member-2")).build());
766
767         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
768                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
769                         new ClusterWrapperImpl(system2)).props().withDispatcher(
770                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
771
772         final TestKit kit = new TestKit(system1);
773         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
774         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
775         shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
776         shardManager2.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
777
778         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
779         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
780         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
781             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
782         shardManager1.tell(
783             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
784             mockShardActor1);
785         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
786             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
787         shardManager2.tell(
788             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
789             mockShardActor2);
790         shardManager1.underlyingActor().waitForMemberUp();
791
792         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
793
794         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
795         String path = found.getPrimaryPath();
796         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
797
798         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
799             kit.getRef());
800
801         shardManager1.underlyingActor().waitForUnreachableMember();
802         MessageCollectorActor.clearMessages(mockShardActor1);
803
804         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
805             kit.getRef());
806
807         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
808
809         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
810
811         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
812             kit.getRef());
813
814         shardManager1.underlyingActor().waitForReachableMember();
815
816         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
817
818         RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
819         String path1 = found1.getPrimaryPath();
820         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
821
822         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
823             kit.getRef());
824
825         // Test FindPrimary wait succeeds after reachable member event.
826
827         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
828                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
829         shardManager1.underlyingActor().waitForUnreachableMember();
830
831         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
832
833         shardManager1.tell(
834             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
835
836         RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
837         String path2 = found2.getPrimaryPath();
838         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
839
840         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
841     }
842
843     @Test
844     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
845         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
846         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
847
848         // Create an ActorSystem ShardManager actor for member-1.
849
850         final ActorSystem system1 = newActorSystem("Member1");
851         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
852
853         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
854
855         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
856         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
857                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
858                         .primaryShardInfoCache(primaryShardInfoCache).props()
859                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
860                 shardManagerID);
861
862         // Create an ActorSystem ShardManager actor for member-2.
863
864         final ActorSystem system2 = newActorSystem("Member2");
865
866         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
867
868         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
869
870         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
871                 .put("default", Arrays.asList("member-1", "member-2")).build());
872
873         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
874                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
875                         new ClusterWrapperImpl(system2)).props().withDispatcher(
876                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
877
878         final TestKit kit = new TestKit(system1);
879         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
880         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
881         shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
882         shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
883
884         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
885         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
886         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
887             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
888         shardManager1.tell(
889             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
890             mockShardActor1);
891         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
892             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
893         shardManager2.tell(
894             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
895             mockShardActor2);
896         shardManager1.underlyingActor().waitForMemberUp();
897
898         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
899
900         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
901         String path = found.getPrimaryPath();
902         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
903
904         primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
905             system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
906
907         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
908                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
909
910         shardManager1.underlyingActor().waitForUnreachableMember();
911
912         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
913
914         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
915
916         assertNull("Expected primaryShardInfoCache entry removed",
917             primaryShardInfoCache.getIfPresent("default"));
918
919         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
920             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
921         shardManager1.tell(
922             new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
923             mockShardActor1);
924
925         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
926
927         LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
928         String path1 = found1.getPrimaryPath();
929         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
930
931         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
932     }
933
934     @Test
935     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
936         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
937         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
938
939         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
940                 .put("default", Arrays.asList("member-256", "member-2")).build());
941
942         // Create an ActorSystem, ShardManager and actor for member-256.
943
944         final ActorSystem system256 = newActorSystem("Member256");
945         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
946         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
947
948         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
949
950         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
951
952         // ShardManager must be created with shard configuration to let its localShards has shards.
953         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
954                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
955                         .cluster(new ClusterWrapperImpl(system256))
956                         .primaryShardInfoCache(primaryShardInfoCache).props()
957                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
958                 shardManagerID);
959
960         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
961
962         final ActorSystem system2 = newActorSystem("Member2");
963
964         // Join member-2 into the cluster of member-256.
965         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
966
967         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
968
969         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
970                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
971                         new ClusterWrapperImpl(system2)).props().withDispatcher(
972                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
973
974         final TestKit kit256 = new TestKit(system256);
975         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
976         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
977         shardManager256.tell(new ActorInitialized(mockShardActor256), ActorRef.noSender());
978         shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
979
980         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
981         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
982         shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
983             DataStoreVersions.CURRENT_VERSION), mockShardActor256);
984         shardManager256.tell(
985             new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
986             mockShardActor256);
987         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
988             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
989         shardManager2.tell(
990             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
991             mockShardActor2);
992         shardManager256.underlyingActor().waitForMemberUp();
993
994         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
995
996         LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
997         String path = found.getPrimaryPath();
998         assertTrue("Unexpected primary path " + path + " which must on member-256",
999             path.contains("member-256-shard-default-config"));
1000
1001         PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
1002             system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
1003         primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
1004
1005         // Simulate member-2 become unreachable.
1006         shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
1007                 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
1008         shardManager256.underlyingActor().waitForUnreachableMember();
1009
1010         // Make sure leader shard on member-256 is still leader and still in the cache.
1011         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
1012         found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
1013         path = found.getPrimaryPath();
1014         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
1015             path.contains("member-256-shard-default-config"));
1016         Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
1017         futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
1018             @Override
1019             public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
1020                 if (failure != null) {
1021                     assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
1022                 } else {
1023                     assertEquals("Expected primaryShardInfoCache entry",
1024                         primaryShardInfo, futurePrimaryShardInfo);
1025                 }
1026             }
1027         }, system256.dispatchers().defaultGlobalDispatcher());
1028
1029         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
1030     }
1031
1032     @Test
1033     public void testOnReceiveFindLocalShardForNonExistentShard() {
1034         final TestKit kit = new TestKit(getSystem());
1035         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1036
1037         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1038
1039         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
1040
1041         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1042
1043         assertEquals("getShardName", "non-existent", notFound.getShardName());
1044     }
1045
1046     @Test
1047     public void testOnReceiveFindLocalShardForExistentShard() {
1048         final TestKit kit = new TestKit(getSystem());
1049         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1050
1051         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1052         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1053
1054         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1055
1056         LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1057
1058         assertTrue("Found path contains " + found.getPath().path().toString(),
1059             found.getPath().path().toString().contains("member-1-shard-default-config"));
1060     }
1061
1062     @Test
1063     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1064         final TestKit kit = new TestKit(getSystem());
1065         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1066
1067         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1068
1069         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1070     }
1071
1072     @Test
1073     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1074         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1075         final TestKit kit = new TestKit(getSystem());
1076         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1077
1078         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1079
1080         // We're passing waitUntilInitialized = true to FindLocalShard
1081         // so the response should be
1082         // delayed until we send ActorInitialized.
1083         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1084             new Timeout(5, TimeUnit.SECONDS));
1085
1086         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1087
1088         Object resp = Await.result(future, kit.duration("5 seconds"));
1089         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1090
1091         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1092     }
1093
1094     @Test
1095     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1096         TestShardManager shardManager = newTestShardManager();
1097
1098         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1099         shardManager.handleCommand(new RoleChangeNotification(
1100                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1101         assertFalse(ready.isDone());
1102
1103         shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1104                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1105         assertTrue(ready.isDone());
1106     }
1107
1108     @Test
1109     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1110         final TestKit kit = new TestKit(getSystem());
1111         TestShardManager shardManager = newTestShardManager();
1112
1113         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1114         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1115         assertFalse(ready.isDone());
1116
1117         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1118
1119         shardManager.handleCommand(
1120             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1121                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1122         assertTrue(ready.isDone());
1123     }
1124
1125     @Test
1126     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1127         final TestKit kit = new TestKit(getSystem());
1128         TestShardManager shardManager = newTestShardManager();
1129
1130         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1131         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1132         assertFalse(ready.isDone());
1133
1134         shardManager.handleCommand(
1135             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1136                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1137
1138         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1139         assertTrue(ready.isDone());
1140     }
1141
1142     @Test
1143     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1144         TestShardManager shardManager = newTestShardManager();
1145
1146         shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1147             RaftState.Leader.name()));
1148         assertFalse(ready.isDone());
1149     }
1150
1151     @Test
1152     public void testByDefaultSyncStatusIsFalse() {
1153         TestShardManager shardManager = newTestShardManager();
1154
1155         assertFalse(shardManager.getMBean().getSyncStatus());
1156     }
1157
1158     @Test
1159     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1160         TestShardManager shardManager = newTestShardManager();
1161
1162         shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1163                 RaftState.Follower.name(), RaftState.Leader.name()));
1164
1165         assertTrue(shardManager.getMBean().getSyncStatus());
1166     }
1167
1168     @Test
1169     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1170         TestShardManager shardManager = newTestShardManager();
1171
1172         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1173         shardManager.handleCommand(new RoleChangeNotification(shardId,
1174                 RaftState.Follower.name(), RaftState.Candidate.name()));
1175
1176         assertFalse(shardManager.getMBean().getSyncStatus());
1177
1178         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1179         shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1180                 true, shardId));
1181
1182         assertFalse(shardManager.getMBean().getSyncStatus());
1183     }
1184
1185     @Test
1186     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1187         TestShardManager shardManager = newTestShardManager();
1188
1189         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1190         shardManager.handleCommand(new RoleChangeNotification(shardId,
1191                 RaftState.Candidate.name(), RaftState.Follower.name()));
1192
1193         // Initially will be false
1194         assertFalse(shardManager.getMBean().getSyncStatus());
1195
1196         // Send status true will make sync status true
1197         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1198
1199         assertTrue(shardManager.getMBean().getSyncStatus());
1200
1201         // Send status false will make sync status false
1202         shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1203
1204         assertFalse(shardManager.getMBean().getSyncStatus());
1205     }
1206
1207     @Test
1208     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1209         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1210         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1211             @Override
1212             public List<String> getMemberShardNames(final MemberName memberName) {
1213                 return Arrays.asList("default", "astronauts");
1214             }
1215         }));
1216
1217         // Initially will be false
1218         assertFalse(shardManager.getMBean().getSyncStatus());
1219
1220         // Make default shard leader
1221         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1222         shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1223                 RaftState.Follower.name(), RaftState.Leader.name()));
1224
1225         // default = Leader, astronauts is unknown so sync status remains false
1226         assertFalse(shardManager.getMBean().getSyncStatus());
1227
1228         // Make astronauts shard leader as well
1229         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1230         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1231                 RaftState.Follower.name(), RaftState.Leader.name()));
1232
1233         // Now sync status should be true
1234         assertTrue(shardManager.getMBean().getSyncStatus());
1235
1236         // Make astronauts a Follower
1237         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1238                 RaftState.Leader.name(), RaftState.Follower.name()));
1239
1240         // Sync status is not true
1241         assertFalse(shardManager.getMBean().getSyncStatus());
1242
1243         // Make the astronauts follower sync status true
1244         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1245
1246         // Sync status is now true
1247         assertTrue(shardManager.getMBean().getSyncStatus());
1248
1249         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1250     }
1251
1252     @Test
1253     public void testOnReceiveSwitchShardBehavior() {
1254         final TestKit kit = new TestKit(getSystem());
1255         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1256
1257         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1258         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1259
1260         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1261
1262         SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1263             SwitchBehavior.class);
1264
1265         assertEquals(RaftState.Leader, switchBehavior.getNewState());
1266         assertEquals(1000, switchBehavior.getNewTerm());
1267     }
1268
1269     private static List<MemberName> members(final String... names) {
1270         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1271     }
1272
1273     @Test
1274     public void testOnCreateShard() {
1275         LOG.info("testOnCreateShard starting");
1276         final TestKit kit = new TestKit(getSystem());
1277         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1278
1279         ActorRef shardManager = actorFactory
1280                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1281                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1282
1283         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1284         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1285
1286         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1287                 .persistent(false).build();
1288         Shard.Builder shardBuilder = Shard.builder();
1289
1290         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1291             "foo", null, members("member-1", "member-5", "member-6"));
1292         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1293
1294         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1295
1296         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1297
1298         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1299
1300         assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1301         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1302             .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1303         assertEquals("peerMembers", Sets.newHashSet(
1304             ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1305             ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1306             shardBuilder.getPeerAddresses().keySet());
1307         assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1308             shardBuilder.getId());
1309         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1310
1311         // Send CreateShard with same name - should return Success with
1312         // a message.
1313
1314         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1315
1316         Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1317         assertNotNull("Success status is null", success.status());
1318
1319         LOG.info("testOnCreateShard ending");
1320     }
1321
1322     @Test
1323     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1324         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1325         final TestKit kit = new TestKit(getSystem());
1326         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1327
1328         ActorRef shardManager = actorFactory
1329                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1330                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1331
1332         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1333
1334         Shard.Builder shardBuilder = Shard.builder();
1335         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1336             "foo", null, members("member-5", "member-6"));
1337
1338         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1339         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1340
1341         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1342         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1343
1344         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1345         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1346             .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1347
1348         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1349     }
1350
1351     @Test
1352     public void testOnCreateShardWithNoInitialSchemaContext() {
1353         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1354         final TestKit kit = new TestKit(getSystem());
1355         ActorRef shardManager = actorFactory
1356                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1357                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1358
1359         Shard.Builder shardBuilder = Shard.builder();
1360
1361         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1362             "foo", null, members("member-1"));
1363         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1364
1365         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1366
1367         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1368         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1369
1370         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1371
1372         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1373
1374         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1375         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1376
1377         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1378     }
1379
1380     @Test
1381     public void testGetSnapshot() {
1382         LOG.info("testGetSnapshot starting");
1383         TestKit kit = new TestKit(getSystem());
1384
1385         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1386                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1387                 .put("astronauts", Collections.<String>emptyList()).build());
1388
1389         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1390                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1391
1392         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1393         Failure failure = kit.expectMsgClass(Failure.class);
1394         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1395
1396         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1397
1398         waitForShardInitialized(shardManager, "shard1", kit);
1399         waitForShardInitialized(shardManager, "shard2", kit);
1400
1401         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1402
1403         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1404
1405         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1406         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1407
1408         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1409             datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1410
1411         // Add a new replica
1412
1413         TestKit mockShardLeaderKit = new TestKit(getSystem());
1414
1415         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1416         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1417
1418         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1419         mockShardLeaderKit.expectMsgClass(AddServer.class);
1420         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1421         kit.expectMsgClass(Status.Success.class);
1422         waitForShardInitialized(shardManager, "astronauts", kit);
1423
1424         // Send another GetSnapshot and verify
1425
1426         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1427         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1428
1429         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1430                 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1431
1432         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1433         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1434         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1435                 Sets.newHashSet(snapshot.getShardList()));
1436
1437         LOG.info("testGetSnapshot ending");
1438     }
1439
1440     @Test
1441     public void testRestoreFromSnapshot() {
1442         LOG.info("testRestoreFromSnapshot starting");
1443
1444         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1445
1446         TestKit kit = new TestKit(getSystem());
1447
1448         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1449                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1450                 .put("astronauts", Collections.<String>emptyList()).build());
1451
1452         ShardManagerSnapshot snapshot =
1453                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1454         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1455                 Collections.<ShardSnapshot>emptyList());
1456         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1457                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1458
1459         shardManager.underlyingActor().waitForRecoveryComplete();
1460
1461         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1462
1463         waitForShardInitialized(shardManager, "shard1", kit);
1464         waitForShardInitialized(shardManager, "shard2", kit);
1465         waitForShardInitialized(shardManager, "astronauts", kit);
1466
1467         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1468
1469         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1470
1471         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1472
1473         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1474         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1475                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1476
1477         LOG.info("testRestoreFromSnapshot ending");
1478     }
1479
1480     @Test
1481     public void testAddShardReplicaForNonExistentShardConfig() {
1482         final TestKit kit = new TestKit(getSystem());
1483         ActorRef shardManager = actorFactory
1484                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1485                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1486
1487         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1488         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1489
1490         assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1491     }
1492
1493     @Test
1494     public void testAddShardReplica() {
1495         LOG.info("testAddShardReplica starting");
1496         MockConfiguration mockConfig = new MockConfiguration(
1497                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1498                         .put("astronauts", Arrays.asList("member-2")).build());
1499
1500         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1501         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1502
1503         // Create an ActorSystem ShardManager actor for member-1.
1504         final ActorSystem system1 = newActorSystem("Member1");
1505         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1506         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1507         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1508                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1509                         .cluster(new ClusterWrapperImpl(system1)).props()
1510                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1511                 shardManagerID);
1512
1513         // Create an ActorSystem ShardManager actor for member-2.
1514         final ActorSystem system2 = newActorSystem("Member2");
1515         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1516
1517         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1518         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1519         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1520                 Props.create(MockRespondActor.class, AddServer.class,
1521                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1522                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1523                 name);
1524         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1525                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1526                         .cluster(new ClusterWrapperImpl(system2)).props()
1527                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1528                 shardManagerID);
1529
1530         final TestKit kit = new TestKit(getSystem());
1531         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1532         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1533
1534         leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
1535
1536         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1537         leaderShardManager.tell(
1538             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1539             mockShardLeaderActor);
1540         leaderShardManager.tell(
1541             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1542             mockShardLeaderActor);
1543
1544         newReplicaShardManager.underlyingActor().waitForMemberUp();
1545         leaderShardManager.underlyingActor().waitForMemberUp();
1546
1547         // Have a dummy snapshot to be overwritten by the new data
1548         // persisted.
1549         String[] restoredShards = { "default", "people" };
1550         ShardManagerSnapshot snapshot =
1551                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1552         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1553         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1554
1555         InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1556         InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1557
1558         // construct a mock response message
1559         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1560         AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1561             AddServer.class);
1562         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1563         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1564         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1565
1566         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1567         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1568         List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1569             ShardManagerSnapshot.class);
1570         assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1571         ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1572         assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1573             Sets.newHashSet(shardManagerSnapshot.getShardList()));
1574         LOG.info("testAddShardReplica ending");
1575     }
1576
1577     @Test
1578     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1579         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1580         final TestKit kit = new TestKit(getSystem());
1581         TestActorRef<TestShardManager> shardManager = actorFactory
1582                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1583
1584         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1585         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1586
1587         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1588         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1589         ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1590                 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1591
1592         MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1593
1594         String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1595         shardManager.tell(
1596             new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1597             mockShardActor);
1598         shardManager.tell(
1599             new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1600             mockShardActor);
1601
1602         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1603
1604         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1605
1606         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1607         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1608
1609         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1610         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1611
1612         // Send message again to verify previous in progress state is
1613         // cleared
1614
1615         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1616         resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1617         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1618
1619         // Send message again with an AddServer timeout to verify the
1620         // pre-existing shard actor isn't terminated.
1621
1622         shardManager.tell(
1623             newDatastoreContextFactory(
1624                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1625         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1626         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1627         kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1628
1629         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1630         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1631
1632         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1633     }
1634
1635     @Test
1636     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1637         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1638         final TestKit kit = new TestKit(getSystem());
1639         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1640         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1641
1642         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1643         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1644         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1645             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1646         shardManager.tell(
1647             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1648             mockShardActor);
1649
1650         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1651         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1652         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1653
1654         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1655         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1656
1657         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1658     }
1659
1660     @Test
1661     public void testAddShardReplicaWithAddServerReplyFailure() {
1662         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1663         final TestKit kit = new TestKit(getSystem());
1664         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1665
1666         MockConfiguration mockConfig = new MockConfiguration(
1667             ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1668
1669         ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1670         final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1671             newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1672             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1673         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1674
1675         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1676
1677         TestKit terminateWatcher = new TestKit(getSystem());
1678         terminateWatcher.watch(mockNewReplicaShardActor);
1679
1680         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1681
1682         AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1683         assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1684             addServerMsg.getNewServerId());
1685         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1686
1687         Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1688         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1689
1690         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1691         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1692
1693         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1694
1695         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1696         mockShardLeaderKit.expectMsgClass(AddServer.class);
1697         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1698         failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1699         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1700
1701         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1702     }
1703
1704     @Test
1705     public void testAddShardReplicaWithAlreadyInProgress() {
1706         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1707                 AddServer.class, new AddShardReplica("astronauts"));
1708     }
1709
1710     @Test
1711     public void testAddShardReplicaWithFindPrimaryTimeout() {
1712         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1713         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1714         final TestKit kit = new TestKit(getSystem());
1715         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1716
1717         final ActorRef newReplicaShardManager = actorFactory
1718                 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1719                     .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1720
1721         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1722         MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1723             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1724
1725         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1726         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1727         assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1728
1729         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1730     }
1731
1732     @Test
1733     public void testRemoveShardReplicaForNonExistentShard() {
1734         final TestKit kit = new TestKit(getSystem());
1735         ActorRef shardManager = actorFactory
1736                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1737                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1738
1739         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1740         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1741         assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1742     }
1743
1744     @Test
1745     /**
1746      * Primary is Local.
1747      */
1748     public void testRemoveShardReplicaLocal() {
1749         final TestKit kit = new TestKit(getSystem());
1750         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1751
1752         final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1753             RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1754
1755         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1756
1757         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1758         shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
1759         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1760             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1761         shardManager.tell(
1762             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1763             respondActor);
1764
1765         shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1766         final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1767             RemoveServer.class);
1768         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1769             removeServer.getServerId());
1770         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1771     }
1772
1773     @Test
1774     public void testRemoveShardReplicaRemote() {
1775         MockConfiguration mockConfig = new MockConfiguration(
1776                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1777                         .put("astronauts", Arrays.asList("member-1")).build());
1778
1779         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1780
1781         // Create an ActorSystem ShardManager actor for member-1.
1782         final ActorSystem system1 = newActorSystem("Member1");
1783         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1784         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1785
1786         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1787                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1788                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1789                 shardManagerID);
1790
1791         // Create an ActorSystem ShardManager actor for member-2.
1792         final ActorSystem system2 = newActorSystem("Member2");
1793         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1794
1795         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1796         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1797         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1798                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1799                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1800
1801         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1802
1803         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1804                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1805                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1806                 shardManagerID);
1807
1808         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1809         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1810         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1811         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1812         // look like so,
1813         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1814         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1815         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1816         // dead letters.
1817         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1818         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1819         // thing works as expected
1820         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1821                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1822                         "member-2-shard-default-" + shardMrgIDSuffix);
1823
1824         LOG.error("Forwarding actor : {}", actorRef);
1825
1826         final TestKit kit = new TestKit(getSystem());
1827         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1828         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1829
1830         leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
1831         newReplicaShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
1832
1833         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1834         leaderShardManager.tell(
1835             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1836             mockShardLeaderActor);
1837         leaderShardManager.tell(
1838             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1839             mockShardLeaderActor);
1840
1841         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1842         newReplicaShardManager.tell(
1843             new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1844             mockShardActor);
1845         newReplicaShardManager.tell(
1846             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1847             mockShardActor);
1848
1849         newReplicaShardManager.underlyingActor().waitForMemberUp();
1850         leaderShardManager.underlyingActor().waitForMemberUp();
1851
1852         // construct a mock response message
1853         newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1854         RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1855             RemoveServer.class);
1856         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1857         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1858         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1859     }
1860
1861     @Test
1862     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1863         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1864                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1865     }
1866
1867     @Test
1868     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1869         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1870                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1871     }
1872
1873
1874     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1875                                                       final Class<?> firstForwardedServerChangeClass,
1876                                                       final Object secondServerChange) {
1877         final TestKit kit = new TestKit(getSystem());
1878         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1879         final TestKit secondRequestKit = new TestKit(getSystem());
1880
1881         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1882             .put(shardName, Arrays.asList("member-2")).build());
1883
1884         final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1885             newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1886             .cluster(new MockClusterWrapper()).props()
1887             .withDispatcher(Dispatchers.DefaultDispatcherId()),
1888             shardMgrID);
1889
1890         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1891
1892         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1893
1894         shardManager.tell(firstServerChange, kit.getRef());
1895
1896         mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1897
1898         shardManager.tell(secondServerChange, secondRequestKit.getRef());
1899
1900         secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1901     }
1902
1903     @Test
1904     public void testServerRemovedShardActorNotRunning() {
1905         LOG.info("testServerRemovedShardActorNotRunning starting");
1906         final TestKit kit = new TestKit(getSystem());
1907         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1908             .put("default", Arrays.asList("member-1", "member-2"))
1909             .put("astronauts", Arrays.asList("member-2"))
1910             .put("people", Arrays.asList("member-1", "member-2")).build());
1911
1912         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1913             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1914
1915         shardManager.underlyingActor().waitForRecoveryComplete();
1916         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1917         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1918
1919         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1920         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1921
1922         // Removed the default shard replica from member-1
1923         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1924         ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1925                 .build();
1926         shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1927
1928         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1929
1930         LOG.info("testServerRemovedShardActorNotRunning ending");
1931     }
1932
1933     @Test
1934     public void testServerRemovedShardActorRunning() {
1935         LOG.info("testServerRemovedShardActorRunning starting");
1936         final TestKit kit = new TestKit(getSystem());
1937         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1938             .put("default", Arrays.asList("member-1", "member-2"))
1939             .put("astronauts", Arrays.asList("member-2"))
1940             .put("people", Arrays.asList("member-1", "member-2")).build());
1941
1942         String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1943         ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1944
1945         TestActorRef<TestShardManager> shardManager = actorFactory
1946                 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1947                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1948
1949         shardManager.underlyingActor().waitForRecoveryComplete();
1950
1951         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1952         shardManager.tell(new ActorInitialized(shard), ActorRef.noSender());
1953
1954         waitForShardInitialized(shardManager, "people", kit);
1955         waitForShardInitialized(shardManager, "default", kit);
1956
1957         // Removed the default shard replica from member-1
1958         shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1959
1960         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1961
1962         MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1963
1964         LOG.info("testServerRemovedShardActorRunning ending");
1965     }
1966
1967     @Test
1968     public void testShardPersistenceWithRestoredData() {
1969         LOG.info("testShardPersistenceWithRestoredData starting");
1970         final TestKit kit = new TestKit(getSystem());
1971         MockConfiguration mockConfig =
1972                 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1973                     .put("default", Arrays.asList("member-1", "member-2"))
1974                     .put("astronauts", Arrays.asList("member-2"))
1975                     .put("people", Arrays.asList("member-1", "member-2")).build());
1976         String[] restoredShards = {"default", "astronauts"};
1977         ShardManagerSnapshot snapshot =
1978                 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1979         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1980
1981         // create shardManager to come up with restored data
1982         TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1983             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1984
1985         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1986
1987         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1988         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1989         assertEquals("for uninitialized shard", "people", notFound.getShardName());
1990
1991         // Verify a local shard is created for the restored shards,
1992         // although we expect a NotInitializedException for the shards
1993         // as the actor initialization
1994         // message is not sent for them
1995         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1996         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1997
1998         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1999         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
2000
2001         LOG.info("testShardPersistenceWithRestoredData ending");
2002     }
2003
2004     @Test
2005     public void testShutDown() throws Exception {
2006         LOG.info("testShutDown starting");
2007         final TestKit kit = new TestKit(getSystem());
2008         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2009             .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2010
2011         String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2012         ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
2013
2014         String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2015         ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
2016
2017         ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
2018             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
2019
2020         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2021         shardManager.tell(new ActorInitialized(shard1), ActorRef.noSender());
2022         shardManager.tell(new ActorInitialized(shard2), ActorRef.noSender());
2023
2024         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2025         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2026
2027         MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2028         MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2029
2030         try {
2031             Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2032             fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2033         } catch (TimeoutException e) {
2034             // expected
2035         }
2036
2037         actorFactory.killActor(shard1, kit);
2038         actorFactory.killActor(shard2, kit);
2039
2040         Boolean stopped = Await.result(stopFuture, duration);
2041         assertEquals("Stopped", Boolean.TRUE, stopped);
2042
2043         LOG.info("testShutDown ending");
2044     }
2045
2046     @Test
2047     public void testChangeServersVotingStatus() {
2048         final TestKit kit = new TestKit(getSystem());
2049         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2050
2051         ActorRef respondActor = actorFactory
2052                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2053                     new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2054
2055         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2056
2057         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2058         shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
2059         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2060             DataStoreVersions.CURRENT_VERSION), kit.getRef());
2061         shardManager.tell(
2062             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2063             respondActor);
2064
2065         shardManager.tell(
2066             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2067
2068         ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2069                 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2070         assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2071             ImmutableMap.of(ShardIdentifier
2072                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2073                 Boolean.TRUE));
2074
2075         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2076     }
2077
2078     @Test
2079     public void testChangeServersVotingStatusWithNoLeader() {
2080         final TestKit kit = new TestKit(getSystem());
2081         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2082
2083         ActorRef respondActor = actorFactory
2084                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2085                     new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2086
2087         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2088
2089         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2090         shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
2091         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2092
2093         shardManager.tell(
2094             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2095
2096         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2097
2098         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2099         assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2100     }
2101
2102     @SuppressWarnings("unchecked")
2103     @Test
2104     public void testRegisterForShardLeaderChanges() {
2105         LOG.info("testRegisterForShardLeaderChanges starting");
2106
2107         final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2108         final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2109         final TestKit kit = new TestKit(getSystem());
2110         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2111
2112         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2113         shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
2114
2115         final Consumer<String> mockCallback = mock(Consumer.class);
2116         shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2117
2118         final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2119         final Registration reg = (Registration) reply.status();
2120
2121         final DataTree mockDataTree = mock(DataTree.class);
2122         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2123             DataStoreVersions.CURRENT_VERSION), mockShardActor);
2124
2125         verify(mockCallback, timeout(5000)).accept("default");
2126
2127         reset(mockCallback);
2128         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2129                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2130
2131         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2132         verifyNoMoreInteractions(mockCallback);
2133
2134         shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2135                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2136
2137         verify(mockCallback, timeout(5000)).accept("default");
2138
2139         reset(mockCallback);
2140         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2141                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2142
2143         verify(mockCallback, timeout(5000)).accept("default");
2144
2145         reset(mockCallback);
2146         reg.close();
2147
2148         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2149                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2150
2151         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2152         verifyNoMoreInteractions(mockCallback);
2153
2154         LOG.info("testRegisterForShardLeaderChanges ending");
2155     }
2156
2157     public static class TestShardManager extends ShardManager {
2158         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2159         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2160         private ShardManagerSnapshot snapshot;
2161         private final Map<String, ActorRef> shardActors;
2162         private final ActorRef shardActor;
2163         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2164         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2165         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2166         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2167         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2168         private volatile MessageInterceptor messageInterceptor;
2169
2170         TestShardManager(final Builder builder) {
2171             super(builder);
2172             shardActor = builder.shardActor;
2173             shardActors = builder.shardActors;
2174         }
2175
2176         @Override
2177         protected void handleRecover(final Object message) throws Exception {
2178             try {
2179                 super.handleRecover(message);
2180             } finally {
2181                 if (message instanceof RecoveryCompleted) {
2182                     recoveryComplete.countDown();
2183                 }
2184             }
2185         }
2186
2187         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2188             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2189                 latch.countDown();
2190             }
2191         }
2192
2193         @Override
2194         public void handleCommand(final Object message) throws Exception {
2195             try {
2196                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2197                     getSender().tell(messageInterceptor.apply(message), getSelf());
2198                 } else {
2199                     super.handleCommand(message);
2200                 }
2201             } finally {
2202                 if (message instanceof FindPrimary) {
2203                     findPrimaryMessageReceived.countDown();
2204                 } else if (message instanceof ClusterEvent.MemberUp) {
2205                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2206                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2207                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2208                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2209                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2210                 } else if (message instanceof ClusterEvent.ReachableMember) {
2211                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2212                 }
2213             }
2214         }
2215
2216         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2217             this.messageInterceptor = messageInterceptor;
2218         }
2219
2220         void waitForRecoveryComplete() {
2221             assertTrue("Recovery complete",
2222                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2223         }
2224
2225         public void waitForMemberUp() {
2226             assertTrue("MemberUp received",
2227                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2228             memberUpReceived = new CountDownLatch(1);
2229         }
2230
2231         void waitForMemberRemoved() {
2232             assertTrue("MemberRemoved received",
2233                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2234             memberRemovedReceived = new CountDownLatch(1);
2235         }
2236
2237         void waitForUnreachableMember() {
2238             assertTrue("UnreachableMember received",
2239                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2240             memberUnreachableReceived = new CountDownLatch(1);
2241         }
2242
2243         void waitForReachableMember() {
2244             assertTrue("ReachableMember received",
2245                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2246             memberReachableReceived = new CountDownLatch(1);
2247         }
2248
2249         void verifyFindPrimary() {
2250             assertTrue("FindPrimary received",
2251                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2252             findPrimaryMessageReceived = new CountDownLatch(1);
2253         }
2254
2255         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2256             return new Builder(datastoreContextBuilder);
2257         }
2258
2259         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2260             private ActorRef shardActor;
2261             private final Map<String, ActorRef> shardActors = new HashMap<>();
2262
2263             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2264                 super(TestShardManager.class);
2265                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2266             }
2267
2268             Builder shardActor(final ActorRef newShardActor) {
2269                 shardActor = newShardActor;
2270                 return this;
2271             }
2272
2273             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2274                 shardActors.put(shardName, actorRef);
2275                 return this;
2276             }
2277         }
2278
2279         @Override
2280         public void saveSnapshot(final Object obj) {
2281             snapshot = (ShardManagerSnapshot) obj;
2282             snapshotPersist.countDown();
2283             super.saveSnapshot(obj);
2284         }
2285
2286         void verifySnapshotPersisted(final Set<String> shardList) {
2287             assertTrue("saveSnapshot invoked",
2288                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2289             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2290         }
2291
2292         @Override
2293         protected ActorRef newShardActor(final ShardInformation info) {
2294             if (shardActors.get(info.getShardName()) != null) {
2295                 return shardActors.get(info.getShardName());
2296             }
2297
2298             if (shardActor != null) {
2299                 return shardActor;
2300             }
2301
2302             return super.newShardActor(info);
2303         }
2304     }
2305
2306     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2307                                                      extends AbstractShardManagerCreator<T> {
2308         private final Class<C> shardManagerClass;
2309
2310         AbstractGenericCreator(final Class<C> shardManagerClass) {
2311             this.shardManagerClass = shardManagerClass;
2312             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
2313                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2314         }
2315
2316         @Override
2317         public Props props() {
2318             verify();
2319             return Props.create(shardManagerClass, this);
2320         }
2321     }
2322
2323     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2324         GenericCreator(final Class<C> shardManagerClass) {
2325             super(shardManagerClass);
2326         }
2327     }
2328
2329     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2330         private static final long serialVersionUID = 1L;
2331         private final Creator<ShardManager> delegate;
2332
2333         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2334             this.delegate = delegate;
2335         }
2336
2337         @Override
2338         public ShardManager create() throws Exception {
2339             return delegate.create();
2340         }
2341     }
2342
2343     interface MessageInterceptor extends Function<Object, Object> {
2344         boolean canIntercept(Object message);
2345     }
2346
2347     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2348         return new MessageInterceptor() {
2349             @Override
2350             public Object apply(final Object message) {
2351                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2352             }
2353
2354             @Override
2355             public boolean canIntercept(final Object message) {
2356                 return message instanceof FindPrimary;
2357             }
2358         };
2359     }
2360
2361     private static class MockRespondActor extends MessageCollectorActor {
2362         static final String CLEAR_RESPONSE = "clear-response";
2363
2364         private Object responseMsg;
2365         private final Class<?> requestClass;
2366
2367         @SuppressWarnings("unused")
2368         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2369             this.requestClass = requestClass;
2370             this.responseMsg = responseMsg;
2371         }
2372
2373         @Override
2374         public void onReceive(final Object message) throws Exception {
2375             if (message.equals(CLEAR_RESPONSE)) {
2376                 responseMsg = null;
2377             } else {
2378                 super.onReceive(message);
2379                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2380                     getSender().tell(responseMsg, getSelf());
2381                 }
2382             }
2383         }
2384     }
2385 }