Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.anyString;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.timeout;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyNoMoreInteractions;
24
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSystem;
27 import akka.actor.AddressFromURIString;
28 import akka.actor.Props;
29 import akka.actor.Status;
30 import akka.actor.Status.Failure;
31 import akka.actor.Status.Success;
32 import akka.cluster.Cluster;
33 import akka.cluster.ClusterEvent;
34 import akka.cluster.Member;
35 import akka.dispatch.Dispatchers;
36 import akka.dispatch.OnComplete;
37 import akka.japi.Creator;
38 import akka.pattern.Patterns;
39 import akka.persistence.RecoveryCompleted;
40 import akka.serialization.Serialization;
41 import akka.testkit.TestActorRef;
42 import akka.testkit.javadsl.TestKit;
43 import akka.util.Timeout;
44 import com.google.common.base.Stopwatch;
45 import com.google.common.collect.ImmutableMap;
46 import com.google.common.collect.Lists;
47 import com.google.common.collect.Sets;
48 import com.google.common.util.concurrent.Uninterruptibles;
49 import java.time.Duration;
50 import java.util.AbstractMap;
51 import java.util.Arrays;
52 import java.util.Collection;
53 import java.util.Collections;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Map.Entry;
58 import java.util.Set;
59 import java.util.concurrent.CountDownLatch;
60 import java.util.concurrent.TimeUnit;
61 import java.util.concurrent.TimeoutException;
62 import java.util.function.Consumer;
63 import java.util.function.Function;
64 import java.util.stream.Collectors;
65 import org.junit.AfterClass;
66 import org.junit.BeforeClass;
67 import org.junit.Test;
68 import org.opendaylight.controller.cluster.access.concepts.MemberName;
69 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
70 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
71 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
72 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
73 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
74 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
75 import org.opendaylight.controller.cluster.datastore.Shard;
76 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
77 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
78 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
79 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
80 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
81 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
82 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
83 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
84 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
85 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
86 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
87 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
88 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
89 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
90 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
91 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
93 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
94 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
95 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
96 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
97 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
98 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
99 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
100 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
101 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
102 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
103 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
104 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
105 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
106 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
107 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
108 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
109 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
110 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
111 import org.opendaylight.controller.cluster.raft.RaftState;
112 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
113 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
114 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
115 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
116 import org.opendaylight.controller.cluster.raft.messages.AddServer;
117 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
118 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
119 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
120 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
121 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
122 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
123 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
124 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
125 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
126 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
127 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
128 import org.opendaylight.yangtools.concepts.Registration;
129 import org.opendaylight.yangtools.yang.common.XMLNamespace;
130 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
131 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
132 import org.slf4j.Logger;
133 import org.slf4j.LoggerFactory;
134 import scala.concurrent.Await;
135 import scala.concurrent.Future;
136 import scala.concurrent.duration.FiniteDuration;
137
138 public class ShardManagerTest extends AbstractShardManagerTest {
139     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
140     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
141     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
142
143     private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
144
145     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
146
147     @BeforeClass
148     public static void beforeClass() {
149         TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
150     }
151
152     @AfterClass
153     public static void afterClass() {
154         TEST_SCHEMA_CONTEXT = null;
155     }
156
157     private ActorSystem newActorSystem(final String config) {
158         return newActorSystem("cluster-test", config);
159     }
160
161     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
162         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
163         if (system == getSystem()) {
164             return actorFactory.createActor(MessageCollectorActor.props(), name);
165         }
166
167         return system.actorOf(MessageCollectorActor.props(), name);
168     }
169
170     private Props newShardMgrProps() {
171         return newShardMgrProps(new MockConfiguration());
172     }
173
174     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
175         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
176         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
177         doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
178         return mockFactory;
179     }
180
181     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
182         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
183     }
184
185     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
186         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
187                 .distributedDataStore(mock(DistributedDataStore.class));
188     }
189
190
191     private Props newPropsShardMgrWithMockShardActor() {
192         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
193                 Dispatchers.DefaultDispatcherId());
194     }
195
196     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
197         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
198                 .withDispatcher(Dispatchers.DefaultDispatcherId());
199     }
200
201
202     private TestShardManager newTestShardManager() {
203         return newTestShardManager(newShardMgrProps());
204     }
205
206     private TestShardManager newTestShardManager(final Props props) {
207         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
208         TestShardManager shardManager = shardManagerActor.underlyingActor();
209         shardManager.waitForRecoveryComplete();
210         return shardManager;
211     }
212
213     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
214             final TestKit kit) {
215         AssertionError last = null;
216         Stopwatch sw = Stopwatch.createStarted();
217         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
218             try {
219                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
220                 kit.expectMsgClass(LocalShardFound.class);
221                 return;
222             } catch (AssertionError e) {
223                 last = e;
224             }
225
226             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
227         }
228
229         throw last;
230     }
231
232     @SuppressWarnings("unchecked")
233     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
234         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
235         if (reply instanceof Failure) {
236             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
237         }
238
239         return (T)reply;
240     }
241
242     @Test
243     public void testPerShardDatastoreContext() throws Exception {
244         LOG.info("testPerShardDatastoreContext starting");
245         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
246                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
247
248         doReturn(
249                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
250                 .when(mockFactory).getShardDatastoreContext("default");
251
252         doReturn(
253                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
254                 .when(mockFactory).getShardDatastoreContext("topology");
255
256         final MockConfiguration mockConfig = new MockConfiguration() {
257             @Override
258             public Collection<String> getMemberShardNames(final MemberName memberName) {
259                 return Arrays.asList("default", "topology");
260             }
261
262             @Override
263             public Collection<MemberName> getMembersFromShardName(final String shardName) {
264                 return members("member-1");
265             }
266         };
267
268         final ActorRef defaultShardActor = actorFactory.createActor(
269                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
270         final ActorRef topologyShardActor = actorFactory.createActor(
271                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
272
273         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
274                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
275         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
276         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
277
278         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
279         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
280         class LocalShardManager extends ShardManager {
281             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
282                 super(creator);
283             }
284
285             @Override
286             protected ActorRef newShardActor(final ShardInformation info) {
287                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
288                 ActorRef ref = null;
289                 if (entry != null) {
290                     ref = entry.getKey();
291                     entry.setValue(info.getDatastoreContext());
292                 }
293
294                 newShardActorLatch.countDown();
295                 return ref;
296             }
297         }
298
299         final Creator<ShardManager> creator = new Creator<>() {
300             private static final long serialVersionUID = 1L;
301             @Override
302             public ShardManager create() {
303                 return new LocalShardManager(
304                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
305                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
306             }
307         };
308
309         final TestKit kit = new TestKit(getSystem());
310
311         final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
312                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
313
314         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
315
316         assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
317         assertEquals("getShardElectionTimeoutFactor", 6,
318                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
319         assertEquals("getShardElectionTimeoutFactor", 7,
320                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
321
322         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
323                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
324         doReturn(
325                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
326                 .when(newMockFactory).getShardDatastoreContext("default");
327
328         doReturn(
329                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
330                 .when(newMockFactory).getShardDatastoreContext("topology");
331
332         shardManager.tell(newMockFactory, kit.getRef());
333
334         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
335                 DatastoreContext.class);
336         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
337
338         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
339         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
340
341         LOG.info("testPerShardDatastoreContext ending");
342     }
343
344     @Test
345     public void testOnReceiveFindPrimaryForNonExistentShard() {
346         final TestKit kit = new TestKit(getSystem());
347         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
348
349         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
350
351         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
352
353         kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
354     }
355
356     @Test
357     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
358         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
359         final TestKit kit = new TestKit(getSystem());
360         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
361
362         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
363
364         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
365         shardManager.tell(new ActorInitialized(), mockShardActor);
366
367         DataTree mockDataTree = mock(DataTree.class);
368         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
369             DataStoreVersions.CURRENT_VERSION), kit.getRef());
370
371         MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
372         shardManager.tell(
373             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
374             mockShardActor);
375
376         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
377
378         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
379             LocalPrimaryShardFound.class);
380         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
381             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
382         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
383
384         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
385     }
386
387     @Test
388     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
389         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
390         final TestKit kit = new TestKit(getSystem());
391         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
392
393         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
394         shardManager.tell(new ActorInitialized(), mockShardActor);
395
396         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
397         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
398         shardManager.tell(
399             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
400             mockShardActor);
401         shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
402             mockShardActor);
403
404         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
405
406         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
407
408         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
409     }
410
411     @Test
412     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
413         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
414         final TestKit kit = new TestKit(getSystem());
415         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
416
417         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
418         shardManager.tell(new ActorInitialized(), mockShardActor);
419
420         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
421         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
422
423         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
424         shardManager.tell(
425             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
426             mockShardActor);
427         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
428         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
429
430         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
431
432         RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
433         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
434             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
435         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
436
437         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
438     }
439
440     @Test
441     public void testOnReceiveFindPrimaryForUninitializedShard() {
442         final TestKit kit = new TestKit(getSystem());
443         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
444
445         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
446
447         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
448     }
449
450     @Test
451     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
452         final TestKit kit = new TestKit(getSystem());
453         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
454
455         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
456         shardManager.tell(new ActorInitialized(), mockShardActor);
457
458         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
459
460         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
461     }
462
463     @Test
464     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
465         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
466         final TestKit kit = new TestKit(getSystem());
467         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
468
469         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
470         shardManager.tell(new ActorInitialized(), mockShardActor);
471
472         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
473         shardManager.tell(
474             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
475             mockShardActor);
476
477         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
478
479         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
480
481         DataTree mockDataTree = mock(DataTree.class);
482         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
483             DataStoreVersions.CURRENT_VERSION), mockShardActor);
484
485         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
486
487         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
488             LocalPrimaryShardFound.class);
489         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
490             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
491         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
492
493         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
494     }
495
496     @Test
497     public void testOnReceiveFindPrimaryWaitForShardLeader() {
498         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
499         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
500         final TestKit kit = new TestKit(getSystem());
501         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
502
503         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
504
505         // We're passing waitUntilInitialized = true to FindPrimary so
506         // the response should be
507         // delayed until we send ActorInitialized and
508         // RoleChangeNotification.
509         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
510
511         kit.expectNoMessage(Duration.ofMillis(150));
512
513         shardManager.tell(new ActorInitialized(), mockShardActor);
514
515         kit.expectNoMessage(Duration.ofMillis(150));
516
517         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
518         shardManager.tell(
519             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
520             mockShardActor);
521
522         kit.expectNoMessage(Duration.ofMillis(150));
523
524         DataTree mockDataTree = mock(DataTree.class);
525         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
526             DataStoreVersions.CURRENT_VERSION), mockShardActor);
527
528         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
529         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
530             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
531         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
532
533         kit.expectNoMessage(Duration.ofMillis(200));
534
535         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
536     }
537
538     @Test
539     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
540         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
541         final TestKit kit = new TestKit(getSystem());
542         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
543
544         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
545
546         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
547
548         kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
549
550         shardManager.tell(new ActorInitialized(), mockShardActor);
551
552         kit.expectNoMessage(Duration.ofMillis(200));
553
554         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
555     }
556
557     @Test
558     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
559         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
560         final TestKit kit = new TestKit(getSystem());
561         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
562
563         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
564         shardManager.tell(new ActorInitialized(), mockShardActor);
565         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
566             RaftState.Candidate.name()), mockShardActor);
567
568         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
569
570         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
571
572         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
573     }
574
575     @Test
576     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
577         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
578         final TestKit kit = new TestKit(getSystem());
579         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
580
581         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
582         shardManager.tell(new ActorInitialized(), mockShardActor);
583         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
584             RaftState.IsolatedLeader.name()), mockShardActor);
585
586         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
587
588         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
589
590         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
591     }
592
593     @Test
594     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
595         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
596         final TestKit kit = new TestKit(getSystem());
597         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
598
599         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
600         shardManager.tell(new ActorInitialized(), mockShardActor);
601
602         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
603
604         kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
605
606         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
607     }
608
609     @Test
610     public void testOnReceiveFindPrimaryForRemoteShard() {
611         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
612         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
613
614         // Create an ActorSystem ShardManager actor for member-1.
615
616         final ActorSystem system1 = newActorSystem("Member1");
617         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
618
619         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
620                 newTestShardMgrBuilderWithMockShardActor().cluster(
621                         new ClusterWrapperImpl(system1)).props().withDispatcher(
622                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
623
624         // Create an ActorSystem ShardManager actor for member-2.
625
626         final ActorSystem system2 = newActorSystem("Member2");
627
628         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
629
630         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
631
632         MockConfiguration mockConfig2 = new MockConfiguration(
633                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
634                         .put("astronauts", Arrays.asList("member-2")).build());
635
636         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
637                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
638                         new ClusterWrapperImpl(system2)).props().withDispatcher(
639                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
640
641         final TestKit kit = new TestKit(system1);
642         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
643         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
644
645         shardManager2.tell(new ActorInitialized(), mockShardActor2);
646
647         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
648         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
649         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
650             mockShardActor2);
651         shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
652             mockShardActor2);
653
654         shardManager1.underlyingActor().waitForMemberUp();
655         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
656
657         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
658         String path = found.getPrimaryPath();
659         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
660         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
661
662         shardManager2.underlyingActor().verifyFindPrimary();
663
664         // This part times out quite a bit on jenkins for some reason
665
666 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
667 //
668 //                shardManager1.underlyingActor().waitForMemberRemoved();
669 //
670 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
671 //
672 //                expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
673
674         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
675     }
676
677     @Test
678     public void testShardAvailabilityOnChangeOfMemberReachability() {
679         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
680         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
681
682         // Create an ActorSystem ShardManager actor for member-1.
683
684         final ActorSystem system1 = newActorSystem("Member1");
685         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
686
687         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
688
689         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
690                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
691                         new ClusterWrapperImpl(system1)).props().withDispatcher(
692                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
693
694         // Create an ActorSystem ShardManager actor for member-2.
695
696         final ActorSystem system2 = newActorSystem("Member2");
697
698         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
699
700         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
701
702         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
703                 .put("default", Arrays.asList("member-1", "member-2")).build());
704
705         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
706                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
707                         new ClusterWrapperImpl(system2)).props().withDispatcher(
708                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
709
710         final TestKit kit = new TestKit(system1);
711         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
712         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
713         shardManager1.tell(new ActorInitialized(), mockShardActor1);
714         shardManager2.tell(new ActorInitialized(), mockShardActor2);
715
716         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
717         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
718         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
719             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
720         shardManager1.tell(
721             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
722             mockShardActor1);
723         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
724             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
725         shardManager2.tell(
726             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
727             mockShardActor2);
728         shardManager1.underlyingActor().waitForMemberUp();
729
730         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
731
732         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
733         String path = found.getPrimaryPath();
734         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
735
736         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
737             kit.getRef());
738
739         shardManager1.underlyingActor().waitForUnreachableMember();
740
741         PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
742         assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
743         MessageCollectorActor.clearMessages(mockShardActor1);
744
745         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
746             kit.getRef());
747
748         MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
749
750         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
751
752         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
753
754         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
755             kit.getRef());
756
757         shardManager1.underlyingActor().waitForReachableMember();
758
759         PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
760         assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
761         MessageCollectorActor.clearMessages(mockShardActor1);
762
763         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
764
765         RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
766         String path1 = found1.getPrimaryPath();
767         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
768
769         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
770             kit.getRef());
771
772         MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
773
774         // Test FindPrimary wait succeeds after reachable member event.
775
776         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
777                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
778         shardManager1.underlyingActor().waitForUnreachableMember();
779
780         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
781
782         shardManager1.tell(
783             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
784
785         RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
786         String path2 = found2.getPrimaryPath();
787         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
788
789         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
790     }
791
792     @Test
793     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
794         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
795         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
796
797         // Create an ActorSystem ShardManager actor for member-1.
798
799         final ActorSystem system1 = newActorSystem("Member1");
800         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
801
802         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
803
804         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
805         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
806                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
807                         .primaryShardInfoCache(primaryShardInfoCache).props()
808                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
809                 shardManagerID);
810
811         // Create an ActorSystem ShardManager actor for member-2.
812
813         final ActorSystem system2 = newActorSystem("Member2");
814
815         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
816
817         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
818
819         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
820                 .put("default", Arrays.asList("member-1", "member-2")).build());
821
822         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
823                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
824                         new ClusterWrapperImpl(system2)).props().withDispatcher(
825                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
826
827         final TestKit kit = new TestKit(system1);
828         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
829         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
830         shardManager1.tell(new ActorInitialized(), mockShardActor1);
831         shardManager2.tell(new ActorInitialized(), mockShardActor2);
832
833         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
834         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
835         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
836             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
837         shardManager1.tell(
838             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
839             mockShardActor1);
840         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
841             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
842         shardManager2.tell(
843             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
844             mockShardActor2);
845         shardManager1.underlyingActor().waitForMemberUp();
846
847         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
848
849         RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
850         String path = found.getPrimaryPath();
851         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
852
853         primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
854             system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
855
856         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
857                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
858
859         shardManager1.underlyingActor().waitForUnreachableMember();
860
861         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
862
863         kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
864
865         assertNull("Expected primaryShardInfoCache entry removed",
866             primaryShardInfoCache.getIfPresent("default"));
867
868         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
869             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
870         shardManager1.tell(
871             new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
872             mockShardActor1);
873
874         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
875
876         LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
877         String path1 = found1.getPrimaryPath();
878         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
879
880         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
881     }
882
883     @Test
884     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
885         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
886         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
887
888         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
889                 .put("default", Arrays.asList("member-256", "member-2")).build());
890
891         // Create an ActorSystem, ShardManager and actor for member-256.
892
893         final ActorSystem system256 = newActorSystem("Member256");
894         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
895         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
896
897         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
898
899         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
900
901         // ShardManager must be created with shard configuration to let its localShards has shards.
902         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
903                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
904                         .cluster(new ClusterWrapperImpl(system256))
905                         .primaryShardInfoCache(primaryShardInfoCache).props()
906                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
907                 shardManagerID);
908
909         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
910
911         final ActorSystem system2 = newActorSystem("Member2");
912
913         // Join member-2 into the cluster of member-256.
914         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
915
916         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
917
918         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
919                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
920                         new ClusterWrapperImpl(system2)).props().withDispatcher(
921                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
922
923         final TestKit kit256 = new TestKit(system256);
924         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
925         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
926         shardManager256.tell(new ActorInitialized(), mockShardActor256);
927         shardManager2.tell(new ActorInitialized(), mockShardActor2);
928
929         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
930         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
931         shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
932             DataStoreVersions.CURRENT_VERSION), mockShardActor256);
933         shardManager256.tell(
934             new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
935             mockShardActor256);
936         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
937             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
938         shardManager2.tell(
939             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
940             mockShardActor2);
941         shardManager256.underlyingActor().waitForMemberUp();
942
943         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
944
945         LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
946         String path = found.getPrimaryPath();
947         assertTrue("Unexpected primary path " + path + " which must on member-256",
948             path.contains("member-256-shard-default-config"));
949
950         PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
951             system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
952         primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
953
954         // Simulate member-2 become unreachable.
955         shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
956                 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
957         shardManager256.underlyingActor().waitForUnreachableMember();
958
959         // Make sure leader shard on member-256 is still leader and still in the cache.
960         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
961         found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
962         path = found.getPrimaryPath();
963         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
964             path.contains("member-256-shard-default-config"));
965         Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
966         futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
967             @Override
968             public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
969                 if (failure != null) {
970                     assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
971                 } else {
972                     assertEquals("Expected primaryShardInfoCache entry",
973                         primaryShardInfo, futurePrimaryShardInfo);
974                 }
975             }
976         }, system256.dispatchers().defaultGlobalDispatcher());
977
978         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
979     }
980
981     @Test
982     public void testOnReceiveFindLocalShardForNonExistentShard() {
983         final TestKit kit = new TestKit(getSystem());
984         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
985
986         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
987
988         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
989
990         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
991
992         assertEquals("getShardName", "non-existent", notFound.getShardName());
993     }
994
995     @Test
996     public void testOnReceiveFindLocalShardForExistentShard() {
997         final TestKit kit = new TestKit(getSystem());
998         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
999
1000         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1001         shardManager.tell(new ActorInitialized(), mockShardActor);
1002
1003         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1004
1005         LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1006
1007         assertTrue("Found path contains " + found.getPath().path().toString(),
1008             found.getPath().path().toString().contains("member-1-shard-default-config"));
1009     }
1010
1011     @Test
1012     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1013         final TestKit kit = new TestKit(getSystem());
1014         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1015
1016         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1017
1018         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1019     }
1020
1021     @Test
1022     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1023         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1024         final TestKit kit = new TestKit(getSystem());
1025         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1026
1027         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1028
1029         // We're passing waitUntilInitialized = true to FindLocalShard
1030         // so the response should be
1031         // delayed until we send ActorInitialized.
1032         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1033             new Timeout(5, TimeUnit.SECONDS));
1034
1035         shardManager.tell(new ActorInitialized(), mockShardActor);
1036
1037         Object resp = Await.result(future, kit.duration("5 seconds"));
1038         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1039
1040         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1041     }
1042
1043     @Test
1044     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1045         TestShardManager shardManager = newTestShardManager();
1046
1047         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1048         shardManager.handleCommand(new RoleChangeNotification(
1049                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1050         assertFalse(ready.isDone());
1051
1052         shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1053                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1054         assertTrue(ready.isDone());
1055     }
1056
1057     @Test
1058     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1059         final TestKit kit = new TestKit(getSystem());
1060         TestShardManager shardManager = newTestShardManager();
1061
1062         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1063         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1064         assertFalse(ready.isDone());
1065
1066         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1067
1068         shardManager.handleCommand(
1069             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1070                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1071         assertTrue(ready.isDone());
1072     }
1073
1074     @Test
1075     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1076         final TestKit kit = new TestKit(getSystem());
1077         TestShardManager shardManager = newTestShardManager();
1078
1079         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1080         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1081         assertFalse(ready.isDone());
1082
1083         shardManager.handleCommand(
1084             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1085                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1086
1087         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1088         assertTrue(ready.isDone());
1089     }
1090
1091     @Test
1092     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1093         TestShardManager shardManager = newTestShardManager();
1094
1095         shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1096             RaftState.Leader.name()));
1097         assertFalse(ready.isDone());
1098     }
1099
1100     @Test
1101     public void testByDefaultSyncStatusIsFalse() {
1102         TestShardManager shardManager = newTestShardManager();
1103
1104         assertFalse(shardManager.getMBean().getSyncStatus());
1105     }
1106
1107     @Test
1108     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1109         TestShardManager shardManager = newTestShardManager();
1110
1111         shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1112                 RaftState.Follower.name(), RaftState.Leader.name()));
1113
1114         assertTrue(shardManager.getMBean().getSyncStatus());
1115     }
1116
1117     @Test
1118     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1119         TestShardManager shardManager = newTestShardManager();
1120
1121         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1122         shardManager.handleCommand(new RoleChangeNotification(shardId,
1123                 RaftState.Follower.name(), RaftState.Candidate.name()));
1124
1125         assertFalse(shardManager.getMBean().getSyncStatus());
1126
1127         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1128         shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1129                 true, shardId));
1130
1131         assertFalse(shardManager.getMBean().getSyncStatus());
1132     }
1133
1134     @Test
1135     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1136         TestShardManager shardManager = newTestShardManager();
1137
1138         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1139         shardManager.handleCommand(new RoleChangeNotification(shardId,
1140                 RaftState.Candidate.name(), RaftState.Follower.name()));
1141
1142         // Initially will be false
1143         assertFalse(shardManager.getMBean().getSyncStatus());
1144
1145         // Send status true will make sync status true
1146         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1147
1148         assertTrue(shardManager.getMBean().getSyncStatus());
1149
1150         // Send status false will make sync status false
1151         shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1152
1153         assertFalse(shardManager.getMBean().getSyncStatus());
1154     }
1155
1156     @Test
1157     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1158         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1159         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1160             @Override
1161             public List<String> getMemberShardNames(final MemberName memberName) {
1162                 return Arrays.asList("default", "astronauts");
1163             }
1164         }));
1165
1166         // Initially will be false
1167         assertFalse(shardManager.getMBean().getSyncStatus());
1168
1169         // Make default shard leader
1170         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1171         shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1172                 RaftState.Follower.name(), RaftState.Leader.name()));
1173
1174         // default = Leader, astronauts is unknown so sync status remains false
1175         assertFalse(shardManager.getMBean().getSyncStatus());
1176
1177         // Make astronauts shard leader as well
1178         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1179         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1180                 RaftState.Follower.name(), RaftState.Leader.name()));
1181
1182         // Now sync status should be true
1183         assertTrue(shardManager.getMBean().getSyncStatus());
1184
1185         // Make astronauts a Follower
1186         shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1187                 RaftState.Leader.name(), RaftState.Follower.name()));
1188
1189         // Sync status is not true
1190         assertFalse(shardManager.getMBean().getSyncStatus());
1191
1192         // Make the astronauts follower sync status true
1193         shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1194
1195         // Sync status is now true
1196         assertTrue(shardManager.getMBean().getSyncStatus());
1197
1198         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1199     }
1200
1201     @Test
1202     public void testOnReceiveSwitchShardBehavior() {
1203         final TestKit kit = new TestKit(getSystem());
1204         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1205
1206         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1207         shardManager.tell(new ActorInitialized(), mockShardActor);
1208
1209         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1210
1211         SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1212             SwitchBehavior.class);
1213
1214         assertEquals(RaftState.Leader, switchBehavior.getNewState());
1215         assertEquals(1000, switchBehavior.getNewTerm());
1216     }
1217
1218     private static List<MemberName> members(final String... names) {
1219         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1220     }
1221
1222     @Test
1223     public void testOnCreateShard() {
1224         LOG.info("testOnCreateShard starting");
1225         final TestKit kit = new TestKit(getSystem());
1226         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1227
1228         ActorRef shardManager = actorFactory
1229                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1230                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1231
1232         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1233         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1234
1235         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1236                 .persistent(false).build();
1237         Shard.Builder shardBuilder = Shard.builder();
1238
1239         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1240             "foo", null, members("member-1", "member-5", "member-6"));
1241         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1242
1243         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1244
1245         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1246
1247         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1248
1249         assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1250         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1251             .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1252         assertEquals("peerMembers", Sets.newHashSet(
1253             ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1254             ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1255             shardBuilder.getPeerAddresses().keySet());
1256         assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1257             shardBuilder.getId());
1258         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1259
1260         // Send CreateShard with same name - should return Success with
1261         // a message.
1262
1263         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1264
1265         Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1266         assertNotNull("Success status is null", success.status());
1267
1268         LOG.info("testOnCreateShard ending");
1269     }
1270
1271     @Test
1272     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1273         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1274         final TestKit kit = new TestKit(getSystem());
1275         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1276
1277         ActorRef shardManager = actorFactory
1278                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1279                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1280
1281         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1282
1283         Shard.Builder shardBuilder = Shard.builder();
1284         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1285             "foo", null, members("member-5", "member-6"));
1286
1287         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1288         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1289
1290         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1291         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1292
1293         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1294         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1295             .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1296
1297         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1298     }
1299
1300     @Test
1301     public void testOnCreateShardWithNoInitialSchemaContext() {
1302         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1303         final TestKit kit = new TestKit(getSystem());
1304         ActorRef shardManager = actorFactory
1305                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1306                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1307
1308         Shard.Builder shardBuilder = Shard.builder();
1309
1310         ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1311             "foo", null, members("member-1"));
1312         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1313
1314         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1315
1316         EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1317         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1318
1319         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1320
1321         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1322
1323         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1324         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1325
1326         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1327     }
1328
1329     @Test
1330     public void testGetSnapshot() {
1331         LOG.info("testGetSnapshot starting");
1332         TestKit kit = new TestKit(getSystem());
1333
1334         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1335                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1336                 .put("astronauts", Collections.<String>emptyList()).build());
1337
1338         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1339                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1340
1341         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1342         Failure failure = kit.expectMsgClass(Failure.class);
1343         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1344
1345         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1346
1347         waitForShardInitialized(shardManager, "shard1", kit);
1348         waitForShardInitialized(shardManager, "shard2", kit);
1349
1350         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1351
1352         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1353
1354         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1355         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1356
1357         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1358             datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1359
1360         // Add a new replica
1361
1362         TestKit mockShardLeaderKit = new TestKit(getSystem());
1363
1364         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1365         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1366
1367         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1368         mockShardLeaderKit.expectMsgClass(AddServer.class);
1369         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1370         kit.expectMsgClass(Status.Success.class);
1371         waitForShardInitialized(shardManager, "astronauts", kit);
1372
1373         // Send another GetSnapshot and verify
1374
1375         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1376         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1377
1378         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1379                 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1380
1381         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1382         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1383         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1384                 Sets.newHashSet(snapshot.getShardList()));
1385
1386         LOG.info("testGetSnapshot ending");
1387     }
1388
1389     @Test
1390     public void testRestoreFromSnapshot() {
1391         LOG.info("testRestoreFromSnapshot starting");
1392
1393         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1394
1395         TestKit kit = new TestKit(getSystem());
1396
1397         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1398                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1399                 .put("astronauts", Collections.<String>emptyList()).build());
1400
1401         ShardManagerSnapshot snapshot =
1402                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1403         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1404                 Collections.<ShardSnapshot>emptyList());
1405         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1406                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1407
1408         shardManager.underlyingActor().waitForRecoveryComplete();
1409
1410         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1411
1412         waitForShardInitialized(shardManager, "shard1", kit);
1413         waitForShardInitialized(shardManager, "shard2", kit);
1414         waitForShardInitialized(shardManager, "astronauts", kit);
1415
1416         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1417
1418         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1419
1420         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1421
1422         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1423         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1424                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1425
1426         LOG.info("testRestoreFromSnapshot ending");
1427     }
1428
1429     @Test
1430     public void testAddShardReplicaForNonExistentShardConfig() {
1431         final TestKit kit = new TestKit(getSystem());
1432         ActorRef shardManager = actorFactory
1433                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1434                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1435
1436         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1437         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1438
1439         assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1440     }
1441
1442     @Test
1443     public void testAddShardReplica() {
1444         LOG.info("testAddShardReplica starting");
1445         MockConfiguration mockConfig = new MockConfiguration(
1446                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1447                         .put("astronauts", Arrays.asList("member-2")).build());
1448
1449         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1450         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1451
1452         // Create an ActorSystem ShardManager actor for member-1.
1453         final ActorSystem system1 = newActorSystem("Member1");
1454         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1455         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1456         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1457                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1458                         .cluster(new ClusterWrapperImpl(system1)).props()
1459                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1460                 shardManagerID);
1461
1462         // Create an ActorSystem ShardManager actor for member-2.
1463         final ActorSystem system2 = newActorSystem("Member2");
1464         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1465
1466         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1467         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1468         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1469                 Props.create(MockRespondActor.class, AddServer.class,
1470                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1471                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1472                 name);
1473         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1474                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1475                         .cluster(new ClusterWrapperImpl(system2)).props()
1476                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1477                 shardManagerID);
1478
1479         final TestKit kit = new TestKit(getSystem());
1480         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1481         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1482
1483         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1484
1485         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1486         leaderShardManager.tell(
1487             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1488             mockShardLeaderActor);
1489         leaderShardManager.tell(
1490             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1491             mockShardLeaderActor);
1492
1493         newReplicaShardManager.underlyingActor().waitForMemberUp();
1494         leaderShardManager.underlyingActor().waitForMemberUp();
1495
1496         // Have a dummy snapshot to be overwritten by the new data
1497         // persisted.
1498         String[] restoredShards = { "default", "people" };
1499         ShardManagerSnapshot snapshot =
1500                 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1501         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1502         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1503
1504         InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1505         InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1506
1507         // construct a mock response message
1508         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1509         AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1510             AddServer.class);
1511         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1512         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1513         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1514
1515         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1516         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1517         List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1518             ShardManagerSnapshot.class);
1519         assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1520         ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1521         assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1522             Sets.newHashSet(shardManagerSnapshot.getShardList()));
1523         LOG.info("testAddShardReplica ending");
1524     }
1525
1526     @Test
1527     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1528         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1529         final TestKit kit = new TestKit(getSystem());
1530         TestActorRef<TestShardManager> shardManager = actorFactory
1531                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1532
1533         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1534         shardManager.tell(new ActorInitialized(), mockShardActor);
1535
1536         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1537         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1538         ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1539                 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1540
1541         MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1542
1543         String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1544         shardManager.tell(
1545             new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1546             mockShardActor);
1547         shardManager.tell(
1548             new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1549             mockShardActor);
1550
1551         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1552
1553         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1554
1555         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1556         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1557
1558         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1559         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1560
1561         // Send message again to verify previous in progress state is
1562         // cleared
1563
1564         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1565         resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1566         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1567
1568         // Send message again with an AddServer timeout to verify the
1569         // pre-existing shard actor isn't terminated.
1570
1571         shardManager.tell(
1572             newDatastoreContextFactory(
1573                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1574         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1575         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1576         kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1577
1578         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1579         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1580
1581         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1582     }
1583
1584     @Test
1585     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1586         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1587         final TestKit kit = new TestKit(getSystem());
1588         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1589         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1590
1591         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1592         shardManager.tell(new ActorInitialized(), mockShardActor);
1593         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1594             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1595         shardManager.tell(
1596             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1597             mockShardActor);
1598
1599         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1600         Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1601         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1602
1603         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1604         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1605
1606         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1607     }
1608
1609     @Test
1610     public void testAddShardReplicaWithAddServerReplyFailure() {
1611         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1612         final TestKit kit = new TestKit(getSystem());
1613         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1614
1615         MockConfiguration mockConfig = new MockConfiguration(
1616             ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1617
1618         ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1619         final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1620             newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1621             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1622         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1623
1624         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1625
1626         TestKit terminateWatcher = new TestKit(getSystem());
1627         terminateWatcher.watch(mockNewReplicaShardActor);
1628
1629         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1630
1631         AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1632         assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1633             addServerMsg.getNewServerId());
1634         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1635
1636         Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1637         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1638
1639         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1640         kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1641
1642         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1643
1644         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1645         mockShardLeaderKit.expectMsgClass(AddServer.class);
1646         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1647         failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1648         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1649
1650         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1651     }
1652
1653     @Test
1654     public void testAddShardReplicaWithAlreadyInProgress() {
1655         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1656                 AddServer.class, new AddShardReplica("astronauts"));
1657     }
1658
1659     @Test
1660     public void testAddShardReplicaWithFindPrimaryTimeout() {
1661         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1662         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1663         final TestKit kit = new TestKit(getSystem());
1664         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1665
1666         final ActorRef newReplicaShardManager = actorFactory
1667                 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1668                     .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1669
1670         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1671         MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1672             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1673
1674         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1675         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1676         assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1677
1678         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1679     }
1680
1681     @Test
1682     public void testRemoveShardReplicaForNonExistentShard() {
1683         final TestKit kit = new TestKit(getSystem());
1684         ActorRef shardManager = actorFactory
1685                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1686                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1687
1688         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1689         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1690         assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1691     }
1692
1693     @Test
1694     /**
1695      * Primary is Local.
1696      */
1697     public void testRemoveShardReplicaLocal() {
1698         final TestKit kit = new TestKit(getSystem());
1699         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1700
1701         final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1702             RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1703
1704         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1705
1706         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1707         shardManager.tell(new ActorInitialized(), respondActor);
1708         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1709             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1710         shardManager.tell(
1711             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1712             respondActor);
1713
1714         shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1715         final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1716             RemoveServer.class);
1717         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1718             removeServer.getServerId());
1719         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1720     }
1721
1722     @Test
1723     public void testRemoveShardReplicaRemote() {
1724         MockConfiguration mockConfig = new MockConfiguration(
1725                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1726                         .put("astronauts", Arrays.asList("member-1")).build());
1727
1728         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1729
1730         // Create an ActorSystem ShardManager actor for member-1.
1731         final ActorSystem system1 = newActorSystem("Member1");
1732         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1733         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1734
1735         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1736                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1737                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1738                 shardManagerID);
1739
1740         // Create an ActorSystem ShardManager actor for member-2.
1741         final ActorSystem system2 = newActorSystem("Member2");
1742         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1743
1744         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1745         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1746         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1747                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1748                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1749
1750         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1751
1752         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1753                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1754                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1755                 shardManagerID);
1756
1757         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1758         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1759         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1760         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1761         // look like so,
1762         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1763         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1764         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1765         // dead letters.
1766         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1767         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1768         // thing works as expected
1769         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1770                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1771                         "member-2-shard-default-" + shardMrgIDSuffix);
1772
1773         LOG.error("Forwarding actor : {}", actorRef);
1774
1775         final TestKit kit = new TestKit(getSystem());
1776         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1777         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1778
1779         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1780         newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1781
1782         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1783         leaderShardManager.tell(
1784             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1785             mockShardLeaderActor);
1786         leaderShardManager.tell(
1787             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1788             mockShardLeaderActor);
1789
1790         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1791         newReplicaShardManager.tell(
1792             new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1793             mockShardActor);
1794         newReplicaShardManager.tell(
1795             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1796             mockShardActor);
1797
1798         newReplicaShardManager.underlyingActor().waitForMemberUp();
1799         leaderShardManager.underlyingActor().waitForMemberUp();
1800
1801         // construct a mock response message
1802         newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1803         RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1804             RemoveServer.class);
1805         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1806         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1807         kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1808     }
1809
1810     @Test
1811     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1812         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1813                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1814     }
1815
1816     @Test
1817     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1818         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1819                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1820     }
1821
1822
1823     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1824                                                       final Class<?> firstForwardedServerChangeClass,
1825                                                       final Object secondServerChange) {
1826         final TestKit kit = new TestKit(getSystem());
1827         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1828         final TestKit secondRequestKit = new TestKit(getSystem());
1829
1830         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1831             .put(shardName, Arrays.asList("member-2")).build());
1832
1833         final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1834             newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1835             .cluster(new MockClusterWrapper()).props()
1836             .withDispatcher(Dispatchers.DefaultDispatcherId()),
1837             shardMgrID);
1838
1839         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1840
1841         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1842
1843         shardManager.tell(firstServerChange, kit.getRef());
1844
1845         mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1846
1847         shardManager.tell(secondServerChange, secondRequestKit.getRef());
1848
1849         secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1850     }
1851
1852     @Test
1853     public void testServerRemovedShardActorNotRunning() {
1854         LOG.info("testServerRemovedShardActorNotRunning starting");
1855         final TestKit kit = new TestKit(getSystem());
1856         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1857             .put("default", Arrays.asList("member-1", "member-2"))
1858             .put("astronauts", Arrays.asList("member-2"))
1859             .put("people", Arrays.asList("member-1", "member-2")).build());
1860
1861         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1862             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1863
1864         shardManager.underlyingActor().waitForRecoveryComplete();
1865         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1866         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1867
1868         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1869         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1870
1871         // Removed the default shard replica from member-1
1872         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1873         ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1874                 .build();
1875         shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1876
1877         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1878
1879         LOG.info("testServerRemovedShardActorNotRunning ending");
1880     }
1881
1882     @Test
1883     public void testServerRemovedShardActorRunning() {
1884         LOG.info("testServerRemovedShardActorRunning starting");
1885         final TestKit kit = new TestKit(getSystem());
1886         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1887             .put("default", Arrays.asList("member-1", "member-2"))
1888             .put("astronauts", Arrays.asList("member-2"))
1889             .put("people", Arrays.asList("member-1", "member-2")).build());
1890
1891         String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1892         ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1893
1894         TestActorRef<TestShardManager> shardManager = actorFactory
1895                 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1896                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1897
1898         shardManager.underlyingActor().waitForRecoveryComplete();
1899
1900         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1901         shardManager.tell(new ActorInitialized(), shard);
1902
1903         waitForShardInitialized(shardManager, "people", kit);
1904         waitForShardInitialized(shardManager, "default", kit);
1905
1906         // Removed the default shard replica from member-1
1907         shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1908
1909         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1910
1911         MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1912
1913         LOG.info("testServerRemovedShardActorRunning ending");
1914     }
1915
1916     @Test
1917     public void testShardPersistenceWithRestoredData() {
1918         LOG.info("testShardPersistenceWithRestoredData starting");
1919         final TestKit kit = new TestKit(getSystem());
1920         MockConfiguration mockConfig =
1921                 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1922                     .put("default", Arrays.asList("member-1", "member-2"))
1923                     .put("astronauts", Arrays.asList("member-2"))
1924                     .put("people", Arrays.asList("member-1", "member-2")).build());
1925         String[] restoredShards = {"default", "astronauts"};
1926         ShardManagerSnapshot snapshot =
1927                 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1928         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1929
1930         // create shardManager to come up with restored data
1931         TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1932             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1933
1934         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1935
1936         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1937         LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1938         assertEquals("for uninitialized shard", "people", notFound.getShardName());
1939
1940         // Verify a local shard is created for the restored shards,
1941         // although we expect a NotInitializedException for the shards
1942         // as the actor initialization
1943         // message is not sent for them
1944         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1945         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1946
1947         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1948         kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1949
1950         LOG.info("testShardPersistenceWithRestoredData ending");
1951     }
1952
1953     @Test
1954     public void testShutDown() throws Exception {
1955         LOG.info("testShutDown starting");
1956         final TestKit kit = new TestKit(getSystem());
1957         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1958             .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1959
1960         String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1961         ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1962
1963         String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1964         ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1965
1966         ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1967             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1968
1969         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1970         shardManager.tell(new ActorInitialized(), shard1);
1971         shardManager.tell(new ActorInitialized(), shard2);
1972
1973         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1974         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1975
1976         MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1977         MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1978
1979         try {
1980             Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1981             fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1982         } catch (TimeoutException e) {
1983             // expected
1984         }
1985
1986         actorFactory.killActor(shard1, kit);
1987         actorFactory.killActor(shard2, kit);
1988
1989         Boolean stopped = Await.result(stopFuture, duration);
1990         assertEquals("Stopped", Boolean.TRUE, stopped);
1991
1992         LOG.info("testShutDown ending");
1993     }
1994
1995     @Test
1996     public void testChangeServersVotingStatus() {
1997         final TestKit kit = new TestKit(getSystem());
1998         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1999
2000         ActorRef respondActor = actorFactory
2001                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2002                     new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2003
2004         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2005
2006         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2007         shardManager.tell(new ActorInitialized(), respondActor);
2008         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2009             DataStoreVersions.CURRENT_VERSION), kit.getRef());
2010         shardManager.tell(
2011             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2012             respondActor);
2013
2014         shardManager.tell(
2015             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2016
2017         ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2018                 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2019         assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2020             ImmutableMap.of(ShardIdentifier
2021                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2022                 Boolean.TRUE));
2023
2024         kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2025     }
2026
2027     @Test
2028     public void testChangeServersVotingStatusWithNoLeader() {
2029         final TestKit kit = new TestKit(getSystem());
2030         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2031
2032         ActorRef respondActor = actorFactory
2033                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2034                     new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2035
2036         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2037
2038         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2039         shardManager.tell(new ActorInitialized(), respondActor);
2040         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2041
2042         shardManager.tell(
2043             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2044
2045         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2046
2047         Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2048         assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2049     }
2050
2051     @SuppressWarnings("unchecked")
2052     @Test
2053     public void testRegisterForShardLeaderChanges() {
2054         LOG.info("testRegisterForShardLeaderChanges starting");
2055
2056         final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2057         final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2058         final TestKit kit = new TestKit(getSystem());
2059         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2060
2061         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2062         shardManager.tell(new ActorInitialized(), mockShardActor);
2063
2064         final Consumer<String> mockCallback = mock(Consumer.class);
2065         shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2066
2067         final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2068         final Registration reg = (Registration) reply.status();
2069
2070         final DataTree mockDataTree = mock(DataTree.class);
2071         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2072             DataStoreVersions.CURRENT_VERSION), mockShardActor);
2073
2074         verify(mockCallback, timeout(5000)).accept("default");
2075
2076         reset(mockCallback);
2077         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2078                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2079
2080         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2081         verifyNoMoreInteractions(mockCallback);
2082
2083         shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2084                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2085
2086         verify(mockCallback, timeout(5000)).accept("default");
2087
2088         reset(mockCallback);
2089         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2090                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2091
2092         verify(mockCallback, timeout(5000)).accept("default");
2093
2094         reset(mockCallback);
2095         reg.close();
2096
2097         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2098                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2099
2100         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2101         verifyNoMoreInteractions(mockCallback);
2102
2103         LOG.info("testRegisterForShardLeaderChanges ending");
2104     }
2105
2106     public static class TestShardManager extends ShardManager {
2107         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2108         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2109         private ShardManagerSnapshot snapshot;
2110         private final Map<String, ActorRef> shardActors;
2111         private final ActorRef shardActor;
2112         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2113         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2114         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2115         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2116         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2117         private volatile MessageInterceptor messageInterceptor;
2118
2119         TestShardManager(final Builder builder) {
2120             super(builder);
2121             shardActor = builder.shardActor;
2122             shardActors = builder.shardActors;
2123         }
2124
2125         @Override
2126         protected void handleRecover(final Object message) throws Exception {
2127             try {
2128                 super.handleRecover(message);
2129             } finally {
2130                 if (message instanceof RecoveryCompleted) {
2131                     recoveryComplete.countDown();
2132                 }
2133             }
2134         }
2135
2136         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2137             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2138                 latch.countDown();
2139             }
2140         }
2141
2142         @Override
2143         public void handleCommand(final Object message) throws Exception {
2144             try {
2145                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2146                     getSender().tell(messageInterceptor.apply(message), getSelf());
2147                 } else {
2148                     super.handleCommand(message);
2149                 }
2150             } finally {
2151                 if (message instanceof FindPrimary) {
2152                     findPrimaryMessageReceived.countDown();
2153                 } else if (message instanceof ClusterEvent.MemberUp) {
2154                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2155                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2156                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2157                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2158                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2159                 } else if (message instanceof ClusterEvent.ReachableMember) {
2160                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2161                 }
2162             }
2163         }
2164
2165         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2166             this.messageInterceptor = messageInterceptor;
2167         }
2168
2169         void waitForRecoveryComplete() {
2170             assertTrue("Recovery complete",
2171                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2172         }
2173
2174         public void waitForMemberUp() {
2175             assertTrue("MemberUp received",
2176                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2177             memberUpReceived = new CountDownLatch(1);
2178         }
2179
2180         void waitForMemberRemoved() {
2181             assertTrue("MemberRemoved received",
2182                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2183             memberRemovedReceived = new CountDownLatch(1);
2184         }
2185
2186         void waitForUnreachableMember() {
2187             assertTrue("UnreachableMember received",
2188                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2189             memberUnreachableReceived = new CountDownLatch(1);
2190         }
2191
2192         void waitForReachableMember() {
2193             assertTrue("ReachableMember received",
2194                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2195             memberReachableReceived = new CountDownLatch(1);
2196         }
2197
2198         void verifyFindPrimary() {
2199             assertTrue("FindPrimary received",
2200                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2201             findPrimaryMessageReceived = new CountDownLatch(1);
2202         }
2203
2204         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2205             return new Builder(datastoreContextBuilder);
2206         }
2207
2208         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2209             private ActorRef shardActor;
2210             private final Map<String, ActorRef> shardActors = new HashMap<>();
2211
2212             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2213                 super(TestShardManager.class);
2214                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2215             }
2216
2217             Builder shardActor(final ActorRef newShardActor) {
2218                 this.shardActor = newShardActor;
2219                 return this;
2220             }
2221
2222             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2223                 shardActors.put(shardName, actorRef);
2224                 return this;
2225             }
2226         }
2227
2228         @Override
2229         public void saveSnapshot(final Object obj) {
2230             snapshot = (ShardManagerSnapshot) obj;
2231             snapshotPersist.countDown();
2232             super.saveSnapshot(obj);
2233         }
2234
2235         void verifySnapshotPersisted(final Set<String> shardList) {
2236             assertTrue("saveSnapshot invoked",
2237                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2238             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2239         }
2240
2241         @Override
2242         protected ActorRef newShardActor(final ShardInformation info) {
2243             if (shardActors.get(info.getShardName()) != null) {
2244                 return shardActors.get(info.getShardName());
2245             }
2246
2247             if (shardActor != null) {
2248                 return shardActor;
2249             }
2250
2251             return super.newShardActor(info);
2252         }
2253     }
2254
2255     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2256                                                      extends AbstractShardManagerCreator<T> {
2257         private final Class<C> shardManagerClass;
2258
2259         AbstractGenericCreator(final Class<C> shardManagerClass) {
2260             this.shardManagerClass = shardManagerClass;
2261             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
2262                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2263         }
2264
2265         @Override
2266         public Props props() {
2267             verify();
2268             return Props.create(shardManagerClass, this);
2269         }
2270     }
2271
2272     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2273         GenericCreator(final Class<C> shardManagerClass) {
2274             super(shardManagerClass);
2275         }
2276     }
2277
2278     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2279         private static final long serialVersionUID = 1L;
2280         private final Creator<ShardManager> delegate;
2281
2282         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2283             this.delegate = delegate;
2284         }
2285
2286         @Override
2287         public ShardManager create() throws Exception {
2288             return delegate.create();
2289         }
2290     }
2291
2292     interface MessageInterceptor extends Function<Object, Object> {
2293         boolean canIntercept(Object message);
2294     }
2295
2296     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2297         return new MessageInterceptor() {
2298             @Override
2299             public Object apply(final Object message) {
2300                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2301             }
2302
2303             @Override
2304             public boolean canIntercept(final Object message) {
2305                 return message instanceof FindPrimary;
2306             }
2307         };
2308     }
2309
2310     private static class MockRespondActor extends MessageCollectorActor {
2311         static final String CLEAR_RESPONSE = "clear-response";
2312
2313         private Object responseMsg;
2314         private final Class<?> requestClass;
2315
2316         @SuppressWarnings("unused")
2317         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2318             this.requestClass = requestClass;
2319             this.responseMsg = responseMsg;
2320         }
2321
2322         @Override
2323         public void onReceive(final Object message) throws Exception {
2324             if (message.equals(CLEAR_RESPONSE)) {
2325                 responseMsg = null;
2326             } else {
2327                 super.onReceive(message);
2328                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2329                     getSender().tell(responseMsg, getSelf());
2330                 }
2331             }
2332         }
2333     }
2334 }