tell-based - reconnect on leader change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.reset;
20 import static org.mockito.Mockito.timeout;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyNoMoreInteractions;
24
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSystem;
27 import akka.actor.AddressFromURIString;
28 import akka.actor.Props;
29 import akka.actor.Status;
30 import akka.actor.Status.Failure;
31 import akka.actor.Status.Success;
32 import akka.cluster.Cluster;
33 import akka.cluster.ClusterEvent;
34 import akka.cluster.Member;
35 import akka.dispatch.Dispatchers;
36 import akka.dispatch.OnComplete;
37 import akka.japi.Creator;
38 import akka.pattern.Patterns;
39 import akka.persistence.RecoveryCompleted;
40 import akka.serialization.Serialization;
41 import akka.testkit.TestActorRef;
42 import akka.testkit.javadsl.TestKit;
43 import akka.util.Timeout;
44 import com.google.common.base.Function;
45 import com.google.common.base.Stopwatch;
46 import com.google.common.collect.ImmutableMap;
47 import com.google.common.collect.Lists;
48 import com.google.common.collect.Sets;
49 import com.google.common.util.concurrent.Uninterruptibles;
50 import java.net.URI;
51 import java.util.AbstractMap;
52 import java.util.Arrays;
53 import java.util.Collection;
54 import java.util.Collections;
55 import java.util.HashMap;
56 import java.util.List;
57 import java.util.Map;
58 import java.util.Map.Entry;
59 import java.util.Set;
60 import java.util.concurrent.CountDownLatch;
61 import java.util.concurrent.TimeUnit;
62 import java.util.concurrent.TimeoutException;
63 import java.util.function.Consumer;
64 import java.util.stream.Collectors;
65 import org.junit.Test;
66 import org.mockito.Mockito;
67 import org.opendaylight.controller.cluster.access.concepts.MemberName;
68 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
69 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
70 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
71 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
72 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
73 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
74 import org.opendaylight.controller.cluster.datastore.Shard;
75 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
76 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
77 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
78 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
79 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
80 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
81 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
82 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
83 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
84 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
85 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
86 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
87 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
88 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
89 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
90 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
91 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
93 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
94 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
95 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
96 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
97 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
98 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
99 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
100 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
101 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
102 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
103 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
104 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
105 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
106 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
107 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
108 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
109 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
110 import org.opendaylight.controller.cluster.raft.RaftState;
111 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
112 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
113 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
114 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
115 import org.opendaylight.controller.cluster.raft.messages.AddServer;
116 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
117 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
118 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
119 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
120 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
121 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
122 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
123 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
124 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
125 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
126 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
127 import org.opendaylight.yangtools.concepts.Registration;
128 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
129 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
130 import org.slf4j.Logger;
131 import org.slf4j.LoggerFactory;
132 import scala.concurrent.Await;
133 import scala.concurrent.Future;
134 import scala.concurrent.duration.Duration;
135 import scala.concurrent.duration.FiniteDuration;
136
137 public class ShardManagerTest extends AbstractShardManagerTest {
138     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
139     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
140     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
141
142     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
143
144     private ActorSystem newActorSystem(final String config) {
145         return newActorSystem("cluster-test", config);
146     }
147
148     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
149         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
150         if (system == getSystem()) {
151             return actorFactory.createActor(MessageCollectorActor.props(), name);
152         }
153
154         return system.actorOf(MessageCollectorActor.props(), name);
155     }
156
157     private Props newShardMgrProps() {
158         return newShardMgrProps(new MockConfiguration());
159     }
160
161     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
162         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
163         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
164         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
165         return mockFactory;
166     }
167
168     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
169         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
170     }
171
172     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
173         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
174                 .distributedDataStore(mock(DistributedDataStore.class));
175     }
176
177
178     private Props newPropsShardMgrWithMockShardActor() {
179         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
180                 Dispatchers.DefaultDispatcherId());
181     }
182
183     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
184         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
185                 .withDispatcher(Dispatchers.DefaultDispatcherId());
186     }
187
188
189     private TestShardManager newTestShardManager() {
190         return newTestShardManager(newShardMgrProps());
191     }
192
193     private TestShardManager newTestShardManager(final Props props) {
194         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
195         TestShardManager shardManager = shardManagerActor.underlyingActor();
196         shardManager.waitForRecoveryComplete();
197         return shardManager;
198     }
199
200     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
201             final TestKit kit) {
202         AssertionError last = null;
203         Stopwatch sw = Stopwatch.createStarted();
204         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
205             try {
206                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
207                 kit.expectMsgClass(LocalShardFound.class);
208                 return;
209             } catch (AssertionError e) {
210                 last = e;
211             }
212
213             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
214         }
215
216         throw last;
217     }
218
219     @SuppressWarnings("unchecked")
220     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
221         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
222         if (reply instanceof Failure) {
223             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
224         }
225
226         return (T)reply;
227     }
228
229     @Test
230     public void testPerShardDatastoreContext() throws Exception {
231         LOG.info("testPerShardDatastoreContext starting");
232         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
233                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
234
235         Mockito.doReturn(
236                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
237                 .when(mockFactory).getShardDatastoreContext("default");
238
239         Mockito.doReturn(
240                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
241                 .when(mockFactory).getShardDatastoreContext("topology");
242
243         final MockConfiguration mockConfig = new MockConfiguration() {
244             @Override
245             public Collection<String> getMemberShardNames(final MemberName memberName) {
246                 return Arrays.asList("default", "topology");
247             }
248
249             @Override
250             public Collection<MemberName> getMembersFromShardName(final String shardName) {
251                 return members("member-1");
252             }
253         };
254
255         final ActorRef defaultShardActor = actorFactory.createActor(
256                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
257         final ActorRef topologyShardActor = actorFactory.createActor(
258                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
259
260         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
261                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
262         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
263         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
264
265         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
266         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
267         class LocalShardManager extends ShardManager {
268             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
269                 super(creator);
270             }
271
272             @Override
273             protected ActorRef newShardActor(final ShardInformation info) {
274                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
275                 ActorRef ref = null;
276                 if (entry != null) {
277                     ref = entry.getKey();
278                     entry.setValue(info.getDatastoreContext());
279                 }
280
281                 newShardActorLatch.countDown();
282                 return ref;
283             }
284         }
285
286         final Creator<ShardManager> creator = new Creator<ShardManager>() {
287             private static final long serialVersionUID = 1L;
288             @Override
289             public ShardManager create() {
290                 return new LocalShardManager(
291                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
292                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
293             }
294         };
295
296         TestKit kit = new TestKit(getSystem());
297
298         final ActorRef shardManager = actorFactory.createActor(Props.create(
299                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
300
301         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
302
303         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
304         assertEquals("getShardElectionTimeoutFactor", 6,
305                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
306         assertEquals("getShardElectionTimeoutFactor", 7,
307                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
308
309         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
310                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
311         Mockito.doReturn(
312                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
313                 .when(newMockFactory).getShardDatastoreContext("default");
314
315         Mockito.doReturn(
316                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
317                 .when(newMockFactory).getShardDatastoreContext("topology");
318
319         shardManager.tell(newMockFactory, kit.getRef());
320
321         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
322                 DatastoreContext.class);
323         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
324
325         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
326         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
327
328         LOG.info("testPerShardDatastoreContext ending");
329     }
330
331     @Test
332     public void testOnReceiveFindPrimaryForNonExistentShard() {
333         new TestKit(getSystem()) {
334             {
335                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
336
337                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
338
339                 shardManager.tell(new FindPrimary("non-existent", false), getRef());
340
341                 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
342             }
343         };
344     }
345
346     @Test
347     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
348         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
349         new TestKit(getSystem()) {
350             {
351                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
352
353                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
354
355                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
356                 shardManager.tell(new ActorInitialized(), mockShardActor);
357
358                 DataTree mockDataTree = mock(DataTree.class);
359                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
360                         DataStoreVersions.CURRENT_VERSION), getRef());
361
362                 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
363                 shardManager.tell(
364                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
365                         mockShardActor);
366
367                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
368
369                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
370                         LocalPrimaryShardFound.class);
371                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
372                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
373                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
374             }
375         };
376
377         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
378     }
379
380     @Test
381     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
382         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
383         new TestKit(getSystem()) {
384             {
385                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
386
387                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
388                 shardManager.tell(new ActorInitialized(), mockShardActor);
389
390                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
391                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
392                 shardManager.tell(
393                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
394                         mockShardActor);
395                 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
396                         mockShardActor);
397
398                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
399
400                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
401             }
402         };
403
404         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
405     }
406
407     @Test
408     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
409         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
410         new TestKit(getSystem()) {
411             {
412                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
413
414                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
415                 shardManager.tell(new ActorInitialized(), mockShardActor);
416
417                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
418                 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
419
420                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
421                 shardManager.tell(
422                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
423                         mockShardActor);
424                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
425                 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
426
427                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
428
429                 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
430                         RemotePrimaryShardFound.class);
431                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
432                         primaryFound.getPrimaryPath().contains("member-2-shard-default"));
433                 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
434             }
435         };
436
437         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
438     }
439
440     @Test
441     public void testOnReceiveFindPrimaryForUninitializedShard() {
442         new TestKit(getSystem()) {
443             {
444                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
445
446                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
447
448                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
449             }
450         };
451     }
452
453     @Test
454     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
455         new TestKit(getSystem()) {
456             {
457                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
458
459                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
460                 shardManager.tell(new ActorInitialized(), mockShardActor);
461
462                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
463
464                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
465             }
466         };
467     }
468
469     @Test
470     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
471         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
472         new TestKit(getSystem()) {
473             {
474                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
475
476                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
477                 shardManager.tell(new ActorInitialized(), mockShardActor);
478
479                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
480                 shardManager.tell(
481                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
482                         mockShardActor);
483
484                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
485
486                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
487
488                 DataTree mockDataTree = mock(DataTree.class);
489                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
490                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
491
492                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
493
494                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
495                         LocalPrimaryShardFound.class);
496                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
497                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
498                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
499             }
500         };
501
502         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
503     }
504
505     @Test
506     public void testOnReceiveFindPrimaryWaitForShardLeader() {
507         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
508         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
509         new TestKit(getSystem()) {
510             {
511                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
512
513                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
514
515                 // We're passing waitUntilInitialized = true to FindPrimary so
516                 // the response should be
517                 // delayed until we send ActorInitialized and
518                 // RoleChangeNotification.
519                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
520
521                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
522
523                 shardManager.tell(new ActorInitialized(), mockShardActor);
524
525                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
526
527                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
528                 shardManager.tell(
529                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
530                         mockShardActor);
531
532                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
533
534                 DataTree mockDataTree = mock(DataTree.class);
535                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
536                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
537
538                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
539                         LocalPrimaryShardFound.class);
540                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
541                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
542                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
543
544                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
545             }
546         };
547
548         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
549     }
550
551     @Test
552     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
553         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
554         new TestKit(getSystem()) {
555             {
556                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
557
558                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
559
560                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
561
562                 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
563
564                 shardManager.tell(new ActorInitialized(), mockShardActor);
565
566                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
567             }
568         };
569
570         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
571     }
572
573     @Test
574     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
575         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
576         new TestKit(getSystem()) {
577             {
578                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
579
580                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
581                 shardManager.tell(new ActorInitialized(), mockShardActor);
582                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
583                         RaftState.Candidate.name()), mockShardActor);
584
585                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
586
587                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
588             }
589         };
590
591         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
592     }
593
594     @Test
595     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
596         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
597         new TestKit(getSystem()) {
598             {
599                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
600
601                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
602                 shardManager.tell(new ActorInitialized(), mockShardActor);
603                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
604                         RaftState.IsolatedLeader.name()), mockShardActor);
605
606                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
607
608                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
609             }
610         };
611
612         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
613     }
614
615     @Test
616     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
617         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
618         new TestKit(getSystem()) {
619             {
620                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
621
622                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
623                 shardManager.tell(new ActorInitialized(), mockShardActor);
624
625                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
626
627                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
628             }
629         };
630
631         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
632     }
633
634     @Test
635     public void testOnReceiveFindPrimaryForRemoteShard() {
636         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
637         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
638
639         // Create an ActorSystem ShardManager actor for member-1.
640
641         final ActorSystem system1 = newActorSystem("Member1");
642         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
643
644         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
645                 newTestShardMgrBuilderWithMockShardActor().cluster(
646                         new ClusterWrapperImpl(system1)).props().withDispatcher(
647                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
648
649         // Create an ActorSystem ShardManager actor for member-2.
650
651         final ActorSystem system2 = newActorSystem("Member2");
652
653         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
654
655         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
656
657         MockConfiguration mockConfig2 = new MockConfiguration(
658                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
659                         .put("astronauts", Arrays.asList("member-2")).build());
660
661         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
662                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
663                         new ClusterWrapperImpl(system2)).props().withDispatcher(
664                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
665
666         new TestKit(system1) {
667             {
668                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
669                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
670
671                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
672
673                 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
674                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
675                 shardManager2.tell(
676                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
677                         mockShardActor2);
678                 shardManager2.tell(
679                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
680                         mockShardActor2);
681
682                 shardManager1.underlyingActor().waitForMemberUp();
683                 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
684
685                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
686                 String path = found.getPrimaryPath();
687                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
688                 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
689
690                 shardManager2.underlyingActor().verifyFindPrimary();
691
692                 // This part times out quite a bit on jenkins for some reason
693
694 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
695 //
696 //                shardManager1.underlyingActor().waitForMemberRemoved();
697 //
698 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
699 //
700 //                expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
701             }
702         };
703
704         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
705     }
706
707     @Test
708     public void testShardAvailabilityOnChangeOfMemberReachability() {
709         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
710         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
711
712         // Create an ActorSystem ShardManager actor for member-1.
713
714         final ActorSystem system1 = newActorSystem("Member1");
715         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
716
717         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
718
719         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
720                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
721                         new ClusterWrapperImpl(system1)).props().withDispatcher(
722                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
723
724         // Create an ActorSystem ShardManager actor for member-2.
725
726         final ActorSystem system2 = newActorSystem("Member2");
727
728         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
729
730         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
731
732         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
733                 .put("default", Arrays.asList("member-1", "member-2")).build());
734
735         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
736                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
737                         new ClusterWrapperImpl(system2)).props().withDispatcher(
738                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
739
740         new TestKit(system1) {
741             {
742                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
743                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
744                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
745                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
746
747                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
748                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
749                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
750                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
751                 shardManager1.tell(
752                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
753                         mockShardActor1);
754                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
755                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
756                 shardManager2.tell(
757                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
758                         mockShardActor2);
759                 shardManager1.underlyingActor().waitForMemberUp();
760
761                 shardManager1.tell(new FindPrimary("default", true), getRef());
762
763                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
764                 String path = found.getPrimaryPath();
765                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
766
767                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
768                         "akka://cluster-test@127.0.0.1:2558"), getRef());
769
770                 shardManager1.underlyingActor().waitForUnreachableMember();
771
772                 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
773                 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
774                 MessageCollectorActor.clearMessages(mockShardActor1);
775
776                 shardManager1.tell(
777                         MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
778                         getRef());
779
780                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
781
782                 shardManager1.tell(new FindPrimary("default", true), getRef());
783
784                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
785
786                 shardManager1.tell(
787                         MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
788                         getRef());
789
790                 shardManager1.underlyingActor().waitForReachableMember();
791
792                 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
793                 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
794                 MessageCollectorActor.clearMessages(mockShardActor1);
795
796                 shardManager1.tell(new FindPrimary("default", true), getRef());
797
798                 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
799                 String path1 = found1.getPrimaryPath();
800                 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
801
802                 shardManager1.tell(
803                         MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
804                         getRef());
805
806                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
807
808                 // Test FindPrimary wait succeeds after reachable member event.
809
810                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
811                         "akka://cluster-test@127.0.0.1:2558"), getRef());
812                 shardManager1.underlyingActor().waitForUnreachableMember();
813
814                 shardManager1.tell(new FindPrimary("default", true), getRef());
815
816                 shardManager1.tell(
817                         MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
818                         getRef());
819
820                 RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
821                 String path2 = found2.getPrimaryPath();
822                 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
823             }
824         };
825
826         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
827     }
828
829     @Test
830     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
831         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
832         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
833
834         // Create an ActorSystem ShardManager actor for member-1.
835
836         final ActorSystem system1 = newActorSystem("Member1");
837         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
838
839         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
840
841         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
842         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
843                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
844                         .primaryShardInfoCache(primaryShardInfoCache).props()
845                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
846                 shardManagerID);
847
848         // Create an ActorSystem ShardManager actor for member-2.
849
850         final ActorSystem system2 = newActorSystem("Member2");
851
852         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
853
854         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
855
856         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
857                 .put("default", Arrays.asList("member-1", "member-2")).build());
858
859         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
860                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
861                         new ClusterWrapperImpl(system2)).props().withDispatcher(
862                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
863
864         new TestKit(system1) {
865             {
866                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
867                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
868                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
869                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
870
871                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
872                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
873                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
874                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
875                 shardManager1.tell(
876                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
877                         mockShardActor1);
878                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
879                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
880                 shardManager2.tell(
881                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
882                         mockShardActor2);
883                 shardManager1.underlyingActor().waitForMemberUp();
884
885                 shardManager1.tell(new FindPrimary("default", true), getRef());
886
887                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
888                 String path = found.getPrimaryPath();
889                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
890
891                 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
892                         system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
893
894                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
895                         "akka://cluster-test@127.0.0.1:2558"), getRef());
896
897                 shardManager1.underlyingActor().waitForUnreachableMember();
898
899                 shardManager1.tell(new FindPrimary("default", true), getRef());
900
901                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
902
903                 assertNull("Expected primaryShardInfoCache entry removed",
904                         primaryShardInfoCache.getIfPresent("default"));
905
906                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
907                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
908                 shardManager1.tell(
909                         new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
910                         mockShardActor1);
911
912                 shardManager1.tell(new FindPrimary("default", true), getRef());
913
914                 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
915                 String path1 = found1.getPrimaryPath();
916                 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
917
918             }
919         };
920
921         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
922     }
923
924     @Test
925     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
926         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
927         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
928
929         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
930                 .put("default", Arrays.asList("member-256", "member-2")).build());
931
932         // Create an ActorSystem, ShardManager and actor for member-256.
933
934         final ActorSystem system256 = newActorSystem("Member256");
935         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
936         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
937
938         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
939
940         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
941
942         // ShardManager must be created with shard configuration to let its localShards has shards.
943         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
944                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
945                         .cluster(new ClusterWrapperImpl(system256))
946                         .primaryShardInfoCache(primaryShardInfoCache).props()
947                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
948                 shardManagerID);
949
950         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
951
952         final ActorSystem system2 = newActorSystem("Member2");
953
954         // Join member-2 into the cluster of member-256.
955         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
956
957         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
958
959         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
960                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
961                         new ClusterWrapperImpl(system2)).props().withDispatcher(
962                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
963
964         new TestKit(system256) {
965             {
966                 shardManager256.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
967                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
968                 shardManager256.tell(new ActorInitialized(), mockShardActor256);
969                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
970
971                 String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
972                 String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
973                 shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
974                         DataStoreVersions.CURRENT_VERSION), mockShardActor256);
975                 shardManager256.tell(
976                         new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
977                         mockShardActor256);
978                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
979                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
980                 shardManager2.tell(
981                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
982                         mockShardActor2);
983                 shardManager256.underlyingActor().waitForMemberUp();
984
985                 shardManager256.tell(new FindPrimary("default", true), getRef());
986
987                 LocalPrimaryShardFound found = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
988                 String path = found.getPrimaryPath();
989                 assertTrue("Unexpected primary path " + path + " which must on member-256",
990                             path.contains("member-256-shard-default-config"));
991
992                 PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
993                         system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
994                 primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
995
996                 // Simulate member-2 become unreachable.
997                 shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
998                         "akka://cluster-test@127.0.0.1:2558"), getRef());
999                 shardManager256.underlyingActor().waitForUnreachableMember();
1000
1001                 // Make sure leader shard on member-256 is still leader and still in the cache.
1002                 shardManager256.tell(new FindPrimary("default", true), getRef());
1003                 found = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
1004                 path = found.getPrimaryPath();
1005                 assertTrue("Unexpected primary path " + path + " which must still not on member-256",
1006                             path.contains("member-256-shard-default-config"));
1007                 Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
1008                 futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
1009                     @Override
1010                     public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
1011                         if (failure != null) {
1012                             assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
1013                         } else {
1014                             assertEquals("Expected primaryShardInfoCache entry",
1015                                         primaryShardInfo, futurePrimaryShardInfo);
1016                         }
1017                     }
1018                 }, system256.dispatchers().defaultGlobalDispatcher());
1019             }
1020         };
1021
1022         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
1023     }
1024
1025     @Test
1026     public void testOnReceiveFindLocalShardForNonExistentShard() {
1027         new TestKit(getSystem()) {
1028             {
1029                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1030
1031                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1032
1033                 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
1034
1035                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1036
1037                 assertEquals("getShardName", "non-existent", notFound.getShardName());
1038             }
1039         };
1040     }
1041
1042     @Test
1043     public void testOnReceiveFindLocalShardForExistentShard() {
1044         new TestKit(getSystem()) {
1045             {
1046                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1047
1048                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1049                 shardManager.tell(new ActorInitialized(), mockShardActor);
1050
1051                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1052
1053                 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1054
1055                 assertTrue("Found path contains " + found.getPath().path().toString(),
1056                         found.getPath().path().toString().contains("member-1-shard-default-config"));
1057             }
1058         };
1059     }
1060
1061     @Test
1062     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1063         new TestKit(getSystem()) {
1064             {
1065                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1066
1067                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1068
1069                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1070             }
1071         };
1072     }
1073
1074     @Test
1075     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1076         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1077         new TestKit(getSystem()) {
1078             {
1079                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1080
1081                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1082
1083                 // We're passing waitUntilInitialized = true to FindLocalShard
1084                 // so the response should be
1085                 // delayed until we send ActorInitialized.
1086                 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1087                         new Timeout(5, TimeUnit.SECONDS));
1088
1089                 shardManager.tell(new ActorInitialized(), mockShardActor);
1090
1091                 Object resp = Await.result(future, duration("5 seconds"));
1092                 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1093             }
1094         };
1095
1096         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1097     }
1098
1099     @Test
1100     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1101         TestShardManager shardManager = newTestShardManager();
1102
1103         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1104         shardManager.onReceiveCommand(new RoleChangeNotification(
1105                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1106
1107         verify(ready, never()).countDown();
1108
1109         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1110                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1111
1112         verify(ready, times(1)).countDown();
1113     }
1114
1115     @Test
1116     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1117         new TestKit(getSystem()) {
1118             {
1119                 TestShardManager shardManager = newTestShardManager();
1120
1121                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1122                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1123
1124                 verify(ready, never()).countDown();
1125
1126                 shardManager
1127                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1128
1129                 shardManager.onReceiveCommand(
1130                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1131                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1132
1133                 verify(ready, times(1)).countDown();
1134             }
1135         };
1136     }
1137
1138     @Test
1139     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1140         new TestKit(getSystem()) {
1141             {
1142                 TestShardManager shardManager = newTestShardManager();
1143
1144                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1145                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1146
1147                 verify(ready, never()).countDown();
1148
1149                 shardManager.onReceiveCommand(
1150                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1151                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1152
1153                 shardManager
1154                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1155
1156                 verify(ready, times(1)).countDown();
1157             }
1158         };
1159     }
1160
1161     @Test
1162     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1163         TestShardManager shardManager = newTestShardManager();
1164
1165         shardManager.onReceiveCommand(new RoleChangeNotification(
1166                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1167
1168         verify(ready, never()).countDown();
1169     }
1170
1171     @Test
1172     public void testByDefaultSyncStatusIsFalse() {
1173         TestShardManager shardManager = newTestShardManager();
1174
1175         assertEquals(false, shardManager.getMBean().getSyncStatus());
1176     }
1177
1178     @Test
1179     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1180         TestShardManager shardManager = newTestShardManager();
1181
1182         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1183                 RaftState.Follower.name(), RaftState.Leader.name()));
1184
1185         assertEquals(true, shardManager.getMBean().getSyncStatus());
1186     }
1187
1188     @Test
1189     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1190         TestShardManager shardManager = newTestShardManager();
1191
1192         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1193         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1194                 RaftState.Follower.name(), RaftState.Candidate.name()));
1195
1196         assertEquals(false, shardManager.getMBean().getSyncStatus());
1197
1198         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1199         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1200                 true, shardId));
1201
1202         assertEquals(false, shardManager.getMBean().getSyncStatus());
1203     }
1204
1205     @Test
1206     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1207         TestShardManager shardManager = newTestShardManager();
1208
1209         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1210         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1211                 RaftState.Candidate.name(), RaftState.Follower.name()));
1212
1213         // Initially will be false
1214         assertEquals(false, shardManager.getMBean().getSyncStatus());
1215
1216         // Send status true will make sync status true
1217         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1218
1219         assertEquals(true, shardManager.getMBean().getSyncStatus());
1220
1221         // Send status false will make sync status false
1222         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1223
1224         assertEquals(false, shardManager.getMBean().getSyncStatus());
1225
1226     }
1227
1228     @Test
1229     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1230         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1231         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1232             @Override
1233             public List<String> getMemberShardNames(final MemberName memberName) {
1234                 return Arrays.asList("default", "astronauts");
1235             }
1236         }));
1237
1238         // Initially will be false
1239         assertEquals(false, shardManager.getMBean().getSyncStatus());
1240
1241         // Make default shard leader
1242         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1243         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1244                 RaftState.Follower.name(), RaftState.Leader.name()));
1245
1246         // default = Leader, astronauts is unknown so sync status remains false
1247         assertEquals(false, shardManager.getMBean().getSyncStatus());
1248
1249         // Make astronauts shard leader as well
1250         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1251         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1252                 RaftState.Follower.name(), RaftState.Leader.name()));
1253
1254         // Now sync status should be true
1255         assertEquals(true, shardManager.getMBean().getSyncStatus());
1256
1257         // Make astronauts a Follower
1258         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1259                 RaftState.Leader.name(), RaftState.Follower.name()));
1260
1261         // Sync status is not true
1262         assertEquals(false, shardManager.getMBean().getSyncStatus());
1263
1264         // Make the astronauts follower sync status true
1265         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1266
1267         // Sync status is now true
1268         assertEquals(true, shardManager.getMBean().getSyncStatus());
1269
1270         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1271     }
1272
1273     @Test
1274     public void testOnReceiveSwitchShardBehavior() {
1275         new TestKit(getSystem()) {
1276             {
1277                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1278
1279                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1280                 shardManager.tell(new ActorInitialized(), mockShardActor);
1281
1282                 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1283
1284                 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1285                         SwitchBehavior.class);
1286
1287                 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1288                 assertEquals(1000, switchBehavior.getNewTerm());
1289             }
1290         };
1291     }
1292
1293     private static List<MemberName> members(final String... names) {
1294         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1295     }
1296
1297     @Test
1298     public void testOnCreateShard() {
1299         LOG.info("testOnCreateShard starting");
1300         new TestKit(getSystem()) {
1301             {
1302                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1303
1304                 ActorRef shardManager = actorFactory
1305                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1306                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1307
1308                 SchemaContext schemaContext = TestModel.createTestContext();
1309                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1310
1311                 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1312                         .persistent(false).build();
1313                 Shard.Builder shardBuilder = Shard.builder();
1314
1315                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1316                         "foo", null, members("member-1", "member-5", "member-6"));
1317                 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1318
1319                 expectMsgClass(duration("5 seconds"), Success.class);
1320
1321                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1322
1323                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1324
1325                 assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1326                 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1327                         .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1328                 assertEquals("peerMembers", Sets.newHashSet(
1329                         ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1330                         ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1331                         shardBuilder.getPeerAddresses().keySet());
1332                 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1333                         shardBuilder.getId());
1334                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1335
1336                 // Send CreateShard with same name - should return Success with
1337                 // a message.
1338
1339                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1340
1341                 Success success = expectMsgClass(duration("5 seconds"), Success.class);
1342                 assertNotNull("Success status is null", success.status());
1343             }
1344         };
1345
1346         LOG.info("testOnCreateShard ending");
1347     }
1348
1349     @Test
1350     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1351         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1352         new TestKit(getSystem()) {
1353             {
1354                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1355
1356                 ActorRef shardManager = actorFactory
1357                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1358                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1359
1360                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1361
1362                 Shard.Builder shardBuilder = Shard.builder();
1363                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1364                         "foo", null, members("member-5", "member-6"));
1365
1366                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1367                 expectMsgClass(duration("5 seconds"), Success.class);
1368
1369                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1370                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1371
1372                 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1373                 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1374                         .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1375             }
1376         };
1377
1378         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1379     }
1380
1381     @Test
1382     public void testOnCreateShardWithNoInitialSchemaContext() {
1383         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1384         new TestKit(getSystem()) {
1385             {
1386                 ActorRef shardManager = actorFactory
1387                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1388                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1389
1390                 Shard.Builder shardBuilder = Shard.builder();
1391
1392                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1393                         "foo", null, members("member-1"));
1394                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1395
1396                 expectMsgClass(duration("5 seconds"), Success.class);
1397
1398                 SchemaContext schemaContext = TestModel.createTestContext();
1399                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1400
1401                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1402
1403                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1404
1405                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1406                 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1407             }
1408         };
1409
1410         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1411     }
1412
1413     @Test
1414     public void testGetSnapshot() {
1415         LOG.info("testGetSnapshot starting");
1416         TestKit kit = new TestKit(getSystem());
1417
1418         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1419                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1420                 .put("astronauts", Collections.<String>emptyList()).build());
1421
1422         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1423                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1424
1425         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1426         Failure failure = kit.expectMsgClass(Failure.class);
1427         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1428
1429         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1430
1431         waitForShardInitialized(shardManager, "shard1", kit);
1432         waitForShardInitialized(shardManager, "shard2", kit);
1433
1434         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1435
1436         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1437
1438         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1439         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1440
1441         Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
1442
1443         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1444             datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet())));
1445
1446         // Add a new replica
1447
1448         TestKit mockShardLeaderKit = new TestKit(getSystem());
1449
1450         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1451         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1452
1453         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1454         mockShardLeaderKit.expectMsgClass(AddServer.class);
1455         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1456         kit.expectMsgClass(Status.Success.class);
1457         waitForShardInitialized(shardManager, "astronauts", kit);
1458
1459         // Send another GetSnapshot and verify
1460
1461         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1462         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1463
1464         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1465                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1466
1467         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1468         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1469         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1470                 Sets.newHashSet(snapshot.getShardList()));
1471
1472         LOG.info("testGetSnapshot ending");
1473     }
1474
1475     @Test
1476     public void testRestoreFromSnapshot() {
1477         LOG.info("testRestoreFromSnapshot starting");
1478
1479         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1480
1481         TestKit kit = new TestKit(getSystem());
1482
1483         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1484                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1485                 .put("astronauts", Collections.<String>emptyList()).build());
1486
1487         ShardManagerSnapshot snapshot =
1488                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1489         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1490                 Collections.<ShardSnapshot>emptyList());
1491         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1492                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1493
1494         shardManager.underlyingActor().waitForRecoveryComplete();
1495
1496         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1497
1498         waitForShardInitialized(shardManager, "shard1", kit);
1499         waitForShardInitialized(shardManager, "shard2", kit);
1500         waitForShardInitialized(shardManager, "astronauts", kit);
1501
1502         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1503
1504         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1505
1506         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1507
1508         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1509         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1510                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1511
1512         LOG.info("testRestoreFromSnapshot ending");
1513     }
1514
1515     @Test
1516     public void testAddShardReplicaForNonExistentShardConfig() {
1517         new TestKit(getSystem()) {
1518             {
1519                 ActorRef shardManager = actorFactory
1520                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1521                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1522
1523                 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1524                 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1525
1526                 assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
1527             }
1528         };
1529     }
1530
1531     @Test
1532     public void testAddShardReplica() {
1533         LOG.info("testAddShardReplica starting");
1534         MockConfiguration mockConfig = new MockConfiguration(
1535                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1536                         .put("astronauts", Arrays.asList("member-2")).build());
1537
1538         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1539         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1540
1541         // Create an ActorSystem ShardManager actor for member-1.
1542         final ActorSystem system1 = newActorSystem("Member1");
1543         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1544         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1545         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1546                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1547                         .cluster(new ClusterWrapperImpl(system1)).props()
1548                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1549                 shardManagerID);
1550
1551         // Create an ActorSystem ShardManager actor for member-2.
1552         final ActorSystem system2 = newActorSystem("Member2");
1553         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1554
1555         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1556         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1557         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1558                 Props.create(MockRespondActor.class, AddServer.class,
1559                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1560                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1561                 name);
1562         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1563                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1564                         .cluster(new ClusterWrapperImpl(system2)).props()
1565                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1566                 shardManagerID);
1567
1568         new TestKit(system1) {
1569             {
1570                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1571                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1572
1573                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1574
1575                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1576                 leaderShardManager.tell(
1577                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1578                         mockShardLeaderActor);
1579                 leaderShardManager.tell(
1580                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1581                         mockShardLeaderActor);
1582
1583                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1584                 leaderShardManager.underlyingActor().waitForMemberUp();
1585
1586                 // Have a dummy snapshot to be overwritten by the new data
1587                 // persisted.
1588                 String[] restoredShards = { "default", "people" };
1589                 ShardManagerSnapshot snapshot =
1590                         new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1591                 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1592                 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1593
1594                 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1595                 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1596
1597                 // construct a mock response message
1598                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1599                 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1600                         AddServer.class);
1601                 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1602                 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1603                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1604
1605                 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1606                 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1607                 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1608                         ShardManagerSnapshot.class);
1609                 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1610                 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1611                 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1612                         Sets.newHashSet(shardManagerSnapshot.getShardList()));
1613             }
1614         };
1615         LOG.info("testAddShardReplica ending");
1616     }
1617
1618     @Test
1619     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1620         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1621         new TestKit(getSystem()) {
1622             {
1623                 TestActorRef<TestShardManager> shardManager = actorFactory
1624                         .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1625
1626                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1627                 shardManager.tell(new ActorInitialized(), mockShardActor);
1628
1629                 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1630                 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1631                 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1632                         .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1633
1634                 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1635
1636                 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1637                 shardManager.tell(
1638                         new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1639                         mockShardActor);
1640                 shardManager.tell(
1641                         new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1642                         mockShardActor);
1643
1644                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1645
1646                 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1647
1648                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1649                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1650
1651                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1652                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1653
1654                 // Send message again to verify previous in progress state is
1655                 // cleared
1656
1657                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1658                 resp = expectMsgClass(duration("5 seconds"), Failure.class);
1659                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1660
1661                 // Send message again with an AddServer timeout to verify the
1662                 // pre-existing shard actor isn't terminated.
1663
1664                 shardManager.tell(
1665                         newDatastoreContextFactory(
1666                                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()),
1667                         getRef());
1668                 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1669                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1670                 expectMsgClass(duration("5 seconds"), Failure.class);
1671
1672                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1673                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1674             }
1675         };
1676
1677         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1678     }
1679
1680     @Test
1681     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1682         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1683         new TestKit(getSystem()) {
1684             {
1685                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1686                 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1687
1688                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1689                 shardManager.tell(new ActorInitialized(), mockShardActor);
1690                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1691                         DataStoreVersions.CURRENT_VERSION), getRef());
1692                 shardManager.tell(
1693                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1694                         mockShardActor);
1695
1696                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1697                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1698                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1699
1700                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1701                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1702             }
1703         };
1704
1705         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1706     }
1707
1708     @Test
1709     public void testAddShardReplicaWithAddServerReplyFailure() {
1710         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1711         new TestKit(getSystem()) {
1712             {
1713                 TestKit mockShardLeaderKit = new TestKit(getSystem());
1714
1715                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1716                         .put("astronauts", Arrays.asList("member-2")).build());
1717
1718                 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1719                 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1720                         newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1721                             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1722                 shardManager.underlyingActor()
1723                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1724
1725                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1726
1727                 TestKit terminateWatcher = new TestKit(getSystem());
1728                 terminateWatcher.watch(mockNewReplicaShardActor);
1729
1730                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1731
1732                 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1733                 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1734                         addServerMsg.getNewServerId());
1735                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1736
1737                 Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1738                 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1739
1740                 shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1741                 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1742
1743                 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1744
1745                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1746                 mockShardLeaderKit.expectMsgClass(AddServer.class);
1747                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1748                 failure = expectMsgClass(duration("5 seconds"), Failure.class);
1749                 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1750             }
1751         };
1752
1753         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1754     }
1755
1756     @Test
1757     public void testAddShardReplicaWithAlreadyInProgress() {
1758         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1759                 AddServer.class, new AddShardReplica("astronauts"));
1760     }
1761
1762     @Test
1763     public void testAddShardReplicaWithFindPrimaryTimeout() {
1764         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1765         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1766         new TestKit(getSystem()) {
1767             {
1768                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1769                         .put("astronauts", Arrays.asList("member-2")).build());
1770
1771                 final ActorRef newReplicaShardManager = actorFactory
1772                         .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1773                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1774
1775                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1776                 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1777                         AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1778
1779                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1780                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1781                 assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
1782             }
1783         };
1784
1785         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1786     }
1787
1788     @Test
1789     public void testRemoveShardReplicaForNonExistentShard() {
1790         new TestKit(getSystem()) {
1791             {
1792                 ActorRef shardManager = actorFactory
1793                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1794                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1795
1796                 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1797                 Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1798                 assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
1799             }
1800         };
1801     }
1802
1803     @Test
1804     /**
1805      * Primary is Local.
1806      */
1807     public void testRemoveShardReplicaLocal() {
1808         new TestKit(getSystem()) {
1809             {
1810                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1811
1812                 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1813                         RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1814
1815                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1816
1817                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1818                 shardManager.tell(new ActorInitialized(), respondActor);
1819                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1820                         DataStoreVersions.CURRENT_VERSION), getRef());
1821                 shardManager.tell(
1822                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1823                         respondActor);
1824
1825                 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1826                 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1827                         RemoveServer.class);
1828                 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1829                         removeServer.getServerId());
1830                 expectMsgClass(duration("5 seconds"), Success.class);
1831             }
1832         };
1833     }
1834
1835     @Test
1836     public void testRemoveShardReplicaRemote() {
1837         MockConfiguration mockConfig = new MockConfiguration(
1838                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1839                         .put("astronauts", Arrays.asList("member-1")).build());
1840
1841         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1842
1843         // Create an ActorSystem ShardManager actor for member-1.
1844         final ActorSystem system1 = newActorSystem("Member1");
1845         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1846         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1847
1848         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1849                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1850                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1851                 shardManagerID);
1852
1853         // Create an ActorSystem ShardManager actor for member-2.
1854         final ActorSystem system2 = newActorSystem("Member2");
1855         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1856
1857         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1858         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1859         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1860                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1861                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1862
1863         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1864
1865         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1866                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1867                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1868                 shardManagerID);
1869
1870         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1871         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1872         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1873         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1874         // look like so,
1875         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1876         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1877         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1878         // dead letters.
1879         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1880         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1881         // thing works as expected
1882         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1883                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1884                         "member-2-shard-default-" + shardMrgIDSuffix);
1885
1886         LOG.error("Forwarding actor : {}", actorRef);
1887
1888         new TestKit(system1) {
1889             {
1890                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1891                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1892
1893                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1894                 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1895
1896                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1897                 leaderShardManager.tell(
1898                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1899                         mockShardLeaderActor);
1900                 leaderShardManager.tell(
1901                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1902                         mockShardLeaderActor);
1903
1904                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1905                 newReplicaShardManager.tell(
1906                         new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1907                         mockShardActor);
1908                 newReplicaShardManager.tell(
1909                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1910                         mockShardActor);
1911
1912                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1913                 leaderShardManager.underlyingActor().waitForMemberUp();
1914
1915                 // construct a mock response message
1916                 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1917                 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1918                         RemoveServer.class);
1919                 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1920                 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1921                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1922             }
1923         };
1924     }
1925
1926     @Test
1927     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1928         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1929                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1930     }
1931
1932     @Test
1933     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1934         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1935                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1936     }
1937
1938
1939     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1940                                                       final Class<?> firstForwardedServerChangeClass,
1941                                                       final Object secondServerChange) {
1942         new TestKit(getSystem()) {
1943             {
1944                 TestKit mockShardLeaderKit = new TestKit(getSystem());
1945                 final TestKit secondRequestKit = new TestKit(getSystem());
1946
1947                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1948                         .put(shardName, Arrays.asList("member-2")).build());
1949
1950                 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1951                         newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1952                                 .cluster(new MockClusterWrapper()).props()
1953                                 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1954                         shardMgrID);
1955
1956                 shardManager.underlyingActor()
1957                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1958
1959                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1960
1961                 shardManager.tell(firstServerChange, getRef());
1962
1963                 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1964
1965                 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1966
1967                 secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1968             }
1969         };
1970     }
1971
1972     @Test
1973     public void testServerRemovedShardActorNotRunning() {
1974         LOG.info("testServerRemovedShardActorNotRunning starting");
1975         new TestKit(getSystem()) {
1976             {
1977                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1978                         .put("default", Arrays.asList("member-1", "member-2"))
1979                         .put("astronauts", Arrays.asList("member-2"))
1980                         .put("people", Arrays.asList("member-1", "member-2")).build());
1981
1982                 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1983                         newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1984
1985                 shardManager.underlyingActor().waitForRecoveryComplete();
1986                 shardManager.tell(new FindLocalShard("people", false), getRef());
1987                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1988
1989                 shardManager.tell(new FindLocalShard("default", false), getRef());
1990                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1991
1992                 // Removed the default shard replica from member-1
1993                 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1994                 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1995                         .build();
1996                 shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1997
1998                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1999             }
2000         };
2001
2002         LOG.info("testServerRemovedShardActorNotRunning ending");
2003     }
2004
2005     @Test
2006     public void testServerRemovedShardActorRunning() {
2007         LOG.info("testServerRemovedShardActorRunning starting");
2008         new TestKit(getSystem()) {
2009             {
2010                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2011                         .put("default", Arrays.asList("member-1", "member-2"))
2012                         .put("astronauts", Arrays.asList("member-2"))
2013                         .put("people", Arrays.asList("member-1", "member-2")).build());
2014
2015                 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
2016                 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
2017
2018                 TestActorRef<TestShardManager> shardManager = actorFactory
2019                         .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
2020                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2021
2022                 shardManager.underlyingActor().waitForRecoveryComplete();
2023
2024                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2025                 shardManager.tell(new ActorInitialized(), shard);
2026
2027                 waitForShardInitialized(shardManager, "people", this);
2028                 waitForShardInitialized(shardManager, "default", this);
2029
2030                 // Removed the default shard replica from member-1
2031                 shardManager.tell(new ServerRemoved(shardId), getRef());
2032
2033                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
2034
2035                 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
2036             }
2037         };
2038
2039         LOG.info("testServerRemovedShardActorRunning ending");
2040     }
2041
2042     @Test
2043     public void testShardPersistenceWithRestoredData() {
2044         LOG.info("testShardPersistenceWithRestoredData starting");
2045         new TestKit(getSystem()) {
2046             {
2047                 MockConfiguration mockConfig =
2048                     new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2049                             .put("default", Arrays.asList("member-1", "member-2"))
2050                             .put("astronauts", Arrays.asList("member-2"))
2051                             .put("people", Arrays.asList("member-1", "member-2")).build());
2052                 String[] restoredShards = {"default", "astronauts"};
2053                 ShardManagerSnapshot snapshot =
2054                         new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
2055                 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
2056
2057                 // create shardManager to come up with restored data
2058                 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
2059                         newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
2060
2061                 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
2062
2063                 newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
2064                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
2065                 assertEquals("for uninitialized shard", "people", notFound.getShardName());
2066
2067                 // Verify a local shard is created for the restored shards,
2068                 // although we expect a NotInitializedException for the shards
2069                 // as the actor initialization
2070                 // message is not sent for them
2071                 newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
2072                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
2073
2074                 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
2075                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
2076             }
2077         };
2078
2079         LOG.info("testShardPersistenceWithRestoredData ending");
2080     }
2081
2082     @Test
2083     public void testShutDown() throws Exception {
2084         LOG.info("testShutDown starting");
2085         new TestKit(getSystem()) {
2086             {
2087                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2088                         .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2089
2090                 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2091                 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
2092
2093                 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2094                 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
2095
2096                 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
2097                         .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
2098
2099                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2100                 shardManager.tell(new ActorInitialized(), shard1);
2101                 shardManager.tell(new ActorInitialized(), shard2);
2102
2103                 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2104                 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2105
2106                 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2107                 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2108
2109                 try {
2110                     Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2111                     fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2112                 } catch (TimeoutException e) {
2113                     // expected
2114                 }
2115
2116                 actorFactory.killActor(shard1, this);
2117                 actorFactory.killActor(shard2, this);
2118
2119                 Boolean stopped = Await.result(stopFuture, duration);
2120                 assertEquals("Stopped", Boolean.TRUE, stopped);
2121             }
2122         };
2123
2124         LOG.info("testShutDown ending");
2125     }
2126
2127     @Test
2128     public void testChangeServersVotingStatus() {
2129         new TestKit(getSystem()) {
2130             {
2131                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2132
2133                 ActorRef respondActor = actorFactory
2134                         .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2135                                 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2136
2137                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2138
2139                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2140                 shardManager.tell(new ActorInitialized(), respondActor);
2141                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2142                         DataStoreVersions.CURRENT_VERSION), getRef());
2143                 shardManager.tell(
2144                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2145                         respondActor);
2146
2147                 shardManager.tell(
2148                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2149                         getRef());
2150
2151                 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2152                         .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2153                 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2154                         ImmutableMap.of(ShardIdentifier
2155                                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2156                                 Boolean.TRUE));
2157
2158                 expectMsgClass(duration("5 seconds"), Success.class);
2159             }
2160         };
2161     }
2162
2163     @Test
2164     public void testChangeServersVotingStatusWithNoLeader() {
2165         new TestKit(getSystem()) {
2166             {
2167                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2168
2169                 ActorRef respondActor = actorFactory
2170                         .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2171                                 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2172
2173                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2174
2175                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2176                 shardManager.tell(new ActorInitialized(), respondActor);
2177                 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2178
2179                 shardManager.tell(
2180                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2181                         getRef());
2182
2183                 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2184
2185                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
2186                 assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
2187             }
2188         };
2189     }
2190
2191     @SuppressWarnings("unchecked")
2192     @Test
2193     public void testRegisterForShardLeaderChanges() {
2194         LOG.info("testRegisterForShardLeaderChanges starting");
2195
2196         final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2197         final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2198         final TestKit kit = new TestKit(getSystem());
2199         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2200
2201         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
2202         shardManager.tell(new ActorInitialized(), mockShardActor);
2203
2204         final Consumer<String> mockCallback = mock(Consumer.class);
2205         shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2206
2207         final Success reply = kit.expectMsgClass(Duration.apply(5, TimeUnit.SECONDS), Success.class);
2208         final Registration reg = (Registration) reply.status();
2209
2210         final DataTree mockDataTree = mock(DataTree.class);
2211         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2212             DataStoreVersions.CURRENT_VERSION), mockShardActor);
2213
2214         verify(mockCallback, timeout(5000)).accept("default");
2215
2216         reset(mockCallback);
2217         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2218                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2219
2220         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2221         verifyNoMoreInteractions(mockCallback);
2222
2223         shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2224                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2225
2226         verify(mockCallback, timeout(5000)).accept("default");
2227
2228         reset(mockCallback);
2229         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2230                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2231
2232         verify(mockCallback, timeout(5000)).accept("default");
2233
2234         reset(mockCallback);
2235         reg.close();
2236
2237         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2238                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2239
2240         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2241         verifyNoMoreInteractions(mockCallback);
2242
2243         LOG.info("testRegisterForShardLeaderChanges ending");
2244     }
2245
2246     public static class TestShardManager extends ShardManager {
2247         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2248         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2249         private ShardManagerSnapshot snapshot;
2250         private final Map<String, ActorRef> shardActors;
2251         private final ActorRef shardActor;
2252         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2253         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2254         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2255         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2256         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2257         private volatile MessageInterceptor messageInterceptor;
2258
2259         TestShardManager(final Builder builder) {
2260             super(builder);
2261             shardActor = builder.shardActor;
2262             shardActors = builder.shardActors;
2263         }
2264
2265         @Override
2266         protected void handleRecover(final Object message) throws Exception {
2267             try {
2268                 super.handleRecover(message);
2269             } finally {
2270                 if (message instanceof RecoveryCompleted) {
2271                     recoveryComplete.countDown();
2272                 }
2273             }
2274         }
2275
2276         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2277             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2278                 latch.countDown();
2279             }
2280         }
2281
2282         @Override
2283         public void handleCommand(final Object message) throws Exception {
2284             try {
2285                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2286                     getSender().tell(messageInterceptor.apply(message), getSelf());
2287                 } else {
2288                     super.handleCommand(message);
2289                 }
2290             } finally {
2291                 if (message instanceof FindPrimary) {
2292                     findPrimaryMessageReceived.countDown();
2293                 } else if (message instanceof ClusterEvent.MemberUp) {
2294                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2295                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2296                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2297                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2298                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2299                 } else if (message instanceof ClusterEvent.ReachableMember) {
2300                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2301                 }
2302             }
2303         }
2304
2305         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2306             this.messageInterceptor = messageInterceptor;
2307         }
2308
2309         void waitForRecoveryComplete() {
2310             assertEquals("Recovery complete", true,
2311                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2312         }
2313
2314         public void waitForMemberUp() {
2315             assertEquals("MemberUp received", true,
2316                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2317             memberUpReceived = new CountDownLatch(1);
2318         }
2319
2320         void waitForMemberRemoved() {
2321             assertEquals("MemberRemoved received", true,
2322                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2323             memberRemovedReceived = new CountDownLatch(1);
2324         }
2325
2326         void waitForUnreachableMember() {
2327             assertEquals("UnreachableMember received", true,
2328                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2329                 ));
2330             memberUnreachableReceived = new CountDownLatch(1);
2331         }
2332
2333         void waitForReachableMember() {
2334             assertEquals("ReachableMember received", true,
2335                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2336             memberReachableReceived = new CountDownLatch(1);
2337         }
2338
2339         void verifyFindPrimary() {
2340             assertEquals("FindPrimary received", true,
2341                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2342             findPrimaryMessageReceived = new CountDownLatch(1);
2343         }
2344
2345         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2346             return new Builder(datastoreContextBuilder);
2347         }
2348
2349         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2350             private ActorRef shardActor;
2351             private final Map<String, ActorRef> shardActors = new HashMap<>();
2352
2353             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2354                 super(TestShardManager.class);
2355                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2356             }
2357
2358             Builder shardActor(final ActorRef newShardActor) {
2359                 this.shardActor = newShardActor;
2360                 return this;
2361             }
2362
2363             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2364                 shardActors.put(shardName, actorRef);
2365                 return this;
2366             }
2367         }
2368
2369         @Override
2370         public void saveSnapshot(final Object obj) {
2371             snapshot = (ShardManagerSnapshot) obj;
2372             snapshotPersist.countDown();
2373             super.saveSnapshot(obj);
2374         }
2375
2376         void verifySnapshotPersisted(final Set<String> shardList) {
2377             assertEquals("saveSnapshot invoked", true,
2378                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2379             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2380         }
2381
2382         @Override
2383         protected ActorRef newShardActor(final ShardInformation info) {
2384             if (shardActors.get(info.getShardName()) != null) {
2385                 return shardActors.get(info.getShardName());
2386             }
2387
2388             if (shardActor != null) {
2389                 return shardActor;
2390             }
2391
2392             return super.newShardActor(info);
2393         }
2394     }
2395
2396     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2397                                                      extends AbstractShardManagerCreator<T> {
2398         private final Class<C> shardManagerClass;
2399
2400         AbstractGenericCreator(final Class<C> shardManagerClass) {
2401             this.shardManagerClass = shardManagerClass;
2402             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2403                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2404         }
2405
2406         @Override
2407         public Props props() {
2408             verify();
2409             return Props.create(shardManagerClass, this);
2410         }
2411     }
2412
2413     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2414         GenericCreator(final Class<C> shardManagerClass) {
2415             super(shardManagerClass);
2416         }
2417     }
2418
2419     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2420         private static final long serialVersionUID = 1L;
2421         private final Creator<ShardManager> delegate;
2422
2423         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2424             this.delegate = delegate;
2425         }
2426
2427         @Override
2428         public ShardManager create() throws Exception {
2429             return delegate.create();
2430         }
2431     }
2432
2433     interface MessageInterceptor extends Function<Object, Object> {
2434         boolean canIntercept(Object message);
2435     }
2436
2437     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2438         return new MessageInterceptor() {
2439             @Override
2440             public Object apply(final Object message) {
2441                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2442             }
2443
2444             @Override
2445             public boolean canIntercept(final Object message) {
2446                 return message instanceof FindPrimary;
2447             }
2448         };
2449     }
2450
2451     private static class MockRespondActor extends MessageCollectorActor {
2452         static final String CLEAR_RESPONSE = "clear-response";
2453
2454         private Object responseMsg;
2455         private final Class<?> requestClass;
2456
2457         @SuppressWarnings("unused")
2458         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2459             this.requestClass = requestClass;
2460             this.responseMsg = responseMsg;
2461         }
2462
2463         @Override
2464         public void onReceive(final Object message) throws Exception {
2465             if (message.equals(CLEAR_RESPONSE)) {
2466                 responseMsg = null;
2467             } else {
2468                 super.onReceive(message);
2469                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2470                     getSender().tell(responseMsg, getSelf());
2471                 }
2472             }
2473         }
2474     }
2475 }