ad591a24d2c5f9cc460ce325a0941bc665dee3d9
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.actor.AddressFromURIString;
25 import akka.actor.Props;
26 import akka.actor.Status;
27 import akka.actor.Status.Failure;
28 import akka.actor.Status.Success;
29 import akka.cluster.Cluster;
30 import akka.cluster.ClusterEvent;
31 import akka.cluster.Member;
32 import akka.dispatch.Dispatchers;
33 import akka.japi.Creator;
34 import akka.pattern.Patterns;
35 import akka.persistence.RecoveryCompleted;
36 import akka.serialization.Serialization;
37 import akka.testkit.JavaTestKit;
38 import akka.testkit.TestActorRef;
39 import akka.util.Timeout;
40 import com.google.common.base.Function;
41 import com.google.common.base.Stopwatch;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.ImmutableSet;
44 import com.google.common.collect.Lists;
45 import com.google.common.collect.Sets;
46 import com.google.common.util.concurrent.Uninterruptibles;
47 import com.typesafe.config.ConfigFactory;
48 import java.net.URI;
49 import java.util.AbstractMap;
50 import java.util.ArrayList;
51 import java.util.Arrays;
52 import java.util.Collection;
53 import java.util.Collections;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Map.Entry;
58 import java.util.Set;
59 import java.util.concurrent.CountDownLatch;
60 import java.util.concurrent.TimeUnit;
61 import java.util.concurrent.TimeoutException;
62 import java.util.stream.Collectors;
63 import org.apache.commons.lang3.SerializationUtils;
64 import org.junit.After;
65 import org.junit.Before;
66 import org.junit.Test;
67 import org.mockito.Mock;
68 import org.mockito.Mockito;
69 import org.mockito.MockitoAnnotations;
70 import org.opendaylight.controller.cluster.access.concepts.MemberName;
71 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
72 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
73 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
74 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
75 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
76 import org.opendaylight.controller.cluster.datastore.Shard;
77 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
78 import org.opendaylight.controller.cluster.datastore.config.Configuration;
79 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
80 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
81 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
82 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
83 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
84 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
85 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
86 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
87 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
88 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
89 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
90 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
91 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
92 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
93 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
94 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
95 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
96 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
97 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
98 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
99 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
100 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
101 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
102 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
103 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
104 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
105 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
106 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
107 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
108 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
109 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
110 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
111 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
112 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
113 import org.opendaylight.controller.cluster.raft.RaftState;
114 import org.opendaylight.controller.cluster.raft.TestActorFactory;
115 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
116 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
117 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
118 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
119 import org.opendaylight.controller.cluster.raft.messages.AddServer;
120 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
121 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
122 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
123 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
124 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
125 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
126 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
127 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
128 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
129 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
130 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
131 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
132 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
133 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
134 import org.slf4j.Logger;
135 import org.slf4j.LoggerFactory;
136 import scala.concurrent.Await;
137 import scala.concurrent.Future;
138 import scala.concurrent.duration.FiniteDuration;
139
140 public class ShardManagerTest extends AbstractActorTest {
141     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
142     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
143     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
144     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
145
146     private static int ID_COUNTER = 1;
147
148     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
149     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
150
151     @Mock
152     private static CountDownLatch ready;
153
154     private static ShardIdentifier mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
155
156     private static TestActorRef<MessageCollectorActor> mockShardActor = TestActorRef.create(getSystem(),
157             Props.create(MessageCollectorActor.class), mockShardName.toString());
158
159     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
160             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
161             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
162
163     private final Collection<ActorSystem> actorSystems = new ArrayList<>();
164
165     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
166
167     @Before
168     public void setUp() {
169         MockitoAnnotations.initMocks(this);
170
171         InMemoryJournal.clear();
172         InMemorySnapshotStore.clear();
173
174         mockShardActor.underlyingActor().clear();
175     }
176
177     @After
178     public void tearDown() {
179         InMemoryJournal.clear();
180         InMemorySnapshotStore.clear();
181
182         for (ActorSystem system: actorSystems) {
183             JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
184         }
185
186         actorFactory.close();
187     }
188
189     private ActorSystem newActorSystem(String config) {
190         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
191         actorSystems.add(system);
192         return system;
193     }
194
195     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
196         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
197         if (system == getSystem()) {
198             return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
199         }
200
201         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
202     }
203
204     private Props newShardMgrProps() {
205         return newShardMgrProps(new MockConfiguration());
206     }
207
208     private Props newShardMgrProps(Configuration config) {
209         return newTestShardMgrBuilder(config).props();
210     }
211
212     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
213         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
214         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
215         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
216         return mockFactory;
217     }
218
219     private TestShardManager.Builder newTestShardMgrBuilder() {
220         return TestShardManager.builder(datastoreContextBuilder);
221     }
222
223     private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
224         return TestShardManager.builder(datastoreContextBuilder).configuration(config);
225     }
226
227     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
228         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
229     }
230
231     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
232         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
233     }
234
235
236     private Props newPropsShardMgrWithMockShardActor() {
237         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
238                 Dispatchers.DefaultDispatcherId());
239     }
240
241     private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
242         return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
243     }
244
245
246     private TestShardManager newTestShardManager() {
247         return newTestShardManager(newShardMgrProps());
248     }
249
250     private TestShardManager newTestShardManager(Props props) {
251         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
252         TestShardManager shardManager = shardManagerActor.underlyingActor();
253         shardManager.waitForRecoveryComplete();
254         return shardManager;
255     }
256
257     private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
258         AssertionError last = null;
259         Stopwatch sw = Stopwatch.createStarted();
260         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
261             try {
262                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
263                 kit.expectMsgClass(LocalShardFound.class);
264                 return;
265             } catch (AssertionError e) {
266                 last = e;
267             }
268
269             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
270         }
271
272         throw last;
273     }
274
275     @SuppressWarnings("unchecked")
276     private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
277         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
278         if (reply instanceof Failure) {
279             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
280         }
281
282         return (T)reply;
283     }
284
285     @Test
286     public void testPerShardDatastoreContext() throws Exception {
287         LOG.info("testPerShardDatastoreContext starting");
288         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
289                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
290
291         Mockito.doReturn(
292                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
293                 .when(mockFactory).getShardDatastoreContext("default");
294
295         Mockito.doReturn(
296                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
297                 .when(mockFactory).getShardDatastoreContext("topology");
298
299         final MockConfiguration mockConfig = new MockConfiguration() {
300             @Override
301             public Collection<String> getMemberShardNames(MemberName memberName) {
302                 return Arrays.asList("default", "topology");
303             }
304
305             @Override
306             public Collection<MemberName> getMembersFromShardName(String shardName) {
307                 return members("member-1");
308             }
309         };
310
311         final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
312                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
313         final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
314                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
315
316         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
317                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
318         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
319         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
320
321         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
322         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
323         class LocalShardManager extends ShardManager {
324             LocalShardManager(AbstractShardManagerCreator<?> creator) {
325                 super(creator);
326             }
327
328             @Override
329             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
330                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
331                 ActorRef ref = null;
332                 if (entry != null) {
333                     ref = entry.getKey();
334                     entry.setValue(info.getDatastoreContext());
335                 }
336
337                 newShardActorLatch.countDown();
338                 return ref;
339             }
340         }
341
342         final Creator<ShardManager> creator = new Creator<ShardManager>() {
343             private static final long serialVersionUID = 1L;
344             @Override
345             public ShardManager create() throws Exception {
346                 return new LocalShardManager(
347                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
348                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
349             }
350         };
351
352         JavaTestKit kit = new JavaTestKit(getSystem());
353
354         final ActorRef shardManager = actorFactory.createActor(Props.create(
355                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
356
357         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
358
359         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
360         assertEquals("getShardElectionTimeoutFactor", 6,
361                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
362         assertEquals("getShardElectionTimeoutFactor", 7,
363                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
364
365         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
366                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
367         Mockito.doReturn(
368                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
369                 .when(newMockFactory).getShardDatastoreContext("default");
370
371         Mockito.doReturn(
372                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
373                 .when(newMockFactory).getShardDatastoreContext("topology");
374
375         shardManager.tell(newMockFactory, kit.getRef());
376
377         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
378                 DatastoreContext.class);
379         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
380
381         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
382         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
383
384         LOG.info("testPerShardDatastoreContext ending");
385     }
386
387     @Test
388     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
389         new JavaTestKit(getSystem()) {
390             {
391                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
392
393                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
394
395                 shardManager.tell(new FindPrimary("non-existent", false), getRef());
396
397                 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
398             }
399         };
400     }
401
402     @Test
403     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
404         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
405         new JavaTestKit(getSystem()) {
406             {
407                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
408
409                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
410
411                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
412                 shardManager.tell(new ActorInitialized(), mockShardActor);
413
414                 DataTree mockDataTree = mock(DataTree.class);
415                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
416                         DataStoreVersions.CURRENT_VERSION), getRef());
417
418                 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
419                 shardManager.tell(
420                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
421                         mockShardActor);
422
423                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
424
425                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
426                         LocalPrimaryShardFound.class);
427                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
428                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
429                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
430             }
431         };
432
433         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
434     }
435
436     @Test
437     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
438         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
439         new JavaTestKit(getSystem()) {
440             {
441                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
442
443                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
444                 shardManager.tell(new ActorInitialized(), mockShardActor);
445
446                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
447                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
448                 shardManager.tell(
449                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
450                         mockShardActor);
451                 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
452                         mockShardActor);
453
454                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
455
456                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
457             }
458         };
459
460         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
461     }
462
463     @Test
464     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
465         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
466         new JavaTestKit(getSystem()) {
467             {
468                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
469
470                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
471                 shardManager.tell(new ActorInitialized(), mockShardActor);
472
473                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
474                 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
475
476                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
477                 shardManager.tell(
478                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
479                         mockShardActor);
480                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
481                 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
482
483                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
484
485                 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
486                         RemotePrimaryShardFound.class);
487                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
488                         primaryFound.getPrimaryPath().contains("member-2-shard-default"));
489                 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
490             }
491         };
492
493         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
494     }
495
496     @Test
497     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
498         new JavaTestKit(getSystem()) {
499             {
500                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
501
502                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
503
504                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
505             }
506         };
507     }
508
509     @Test
510     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
511         new JavaTestKit(getSystem()) {
512             {
513                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
514
515                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
516                 shardManager.tell(new ActorInitialized(), mockShardActor);
517
518                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
519
520                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
521             }
522         };
523     }
524
525     @Test
526     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
527         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
528         new JavaTestKit(getSystem()) {
529             {
530                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
531
532                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
533                 shardManager.tell(new ActorInitialized(), mockShardActor);
534
535                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
536                 shardManager.tell(
537                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
538                         mockShardActor);
539
540                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
541
542                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
543
544                 DataTree mockDataTree = mock(DataTree.class);
545                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
546                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
547
548                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
549
550                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
551                         LocalPrimaryShardFound.class);
552                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
553                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
554                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
555             }
556         };
557
558         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
559     }
560
561     @Test
562     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
563         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
564         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
565         new JavaTestKit(getSystem()) {
566             {
567                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
568
569                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
570
571                 // We're passing waitUntilInitialized = true to FindPrimary so
572                 // the response should be
573                 // delayed until we send ActorInitialized and
574                 // RoleChangeNotification.
575                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
576
577                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
578
579                 shardManager.tell(new ActorInitialized(), mockShardActor);
580
581                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
582
583                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
584                 shardManager.tell(
585                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
586                         mockShardActor);
587
588                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
589
590                 DataTree mockDataTree = mock(DataTree.class);
591                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
592                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
593
594                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
595                         LocalPrimaryShardFound.class);
596                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
597                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
598                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
599
600                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
601             }
602         };
603
604         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
605     }
606
607     @Test
608     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
609         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
610         new JavaTestKit(getSystem()) {
611             {
612                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
613
614                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
615
616                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
617
618                 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
619
620                 shardManager.tell(new ActorInitialized(), mockShardActor);
621
622                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
623             }
624         };
625
626         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
627     }
628
629     @Test
630     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
631         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
632         new JavaTestKit(getSystem()) {
633             {
634                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
635
636                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
637                 shardManager.tell(new ActorInitialized(), mockShardActor);
638                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
639                         RaftState.Candidate.name()), mockShardActor);
640
641                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
642
643                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
644             }
645         };
646
647         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
648     }
649
650     @Test
651     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
652         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
653         new JavaTestKit(getSystem()) {
654             {
655                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
656
657                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
658                 shardManager.tell(new ActorInitialized(), mockShardActor);
659                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
660                         RaftState.IsolatedLeader.name()), mockShardActor);
661
662                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
663
664                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
665             }
666         };
667
668         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
669     }
670
671     @Test
672     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
673         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
674         new JavaTestKit(getSystem()) {
675             {
676                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
677
678                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
679                 shardManager.tell(new ActorInitialized(), mockShardActor);
680
681                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
682
683                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
684             }
685         };
686
687         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
688     }
689
690     @Test
691     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
692         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
693         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
694
695         // Create an ActorSystem ShardManager actor for member-1.
696
697         final ActorSystem system1 = newActorSystem("Member1");
698         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
699
700         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
701                 newTestShardMgrBuilderWithMockShardActor().cluster(
702                         new ClusterWrapperImpl(system1)).props().withDispatcher(
703                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
704
705         // Create an ActorSystem ShardManager actor for member-2.
706
707         final ActorSystem system2 = newActorSystem("Member2");
708
709         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
710
711         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
712
713         MockConfiguration mockConfig2 = new MockConfiguration(
714                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
715                         .put("astronauts", Arrays.asList("member-2")).build());
716
717         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
718                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
719                         new ClusterWrapperImpl(system2)).props().withDispatcher(
720                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
721
722         new JavaTestKit(system1) {
723             {
724                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
725                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
726
727                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
728
729                 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
730                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
731                 shardManager2.tell(
732                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
733                         mockShardActor2);
734                 shardManager2.tell(
735                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
736                         mockShardActor2);
737
738                 shardManager1.underlyingActor().waitForMemberUp();
739                 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
740
741                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
742                 String path = found.getPrimaryPath();
743                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
744                 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
745
746                 shardManager2.underlyingActor().verifyFindPrimary();
747
748                 Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
749
750                 shardManager1.underlyingActor().waitForMemberRemoved();
751
752                 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
753
754                 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
755             }
756         };
757
758         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
759     }
760
761     @Test
762     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
763         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
764         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
765
766         // Create an ActorSystem ShardManager actor for member-1.
767
768         final ActorSystem system1 = newActorSystem("Member1");
769         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
770
771         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
772
773         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
774                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
775                         new ClusterWrapperImpl(system1)).props().withDispatcher(
776                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
777
778         // Create an ActorSystem ShardManager actor for member-2.
779
780         final ActorSystem system2 = newActorSystem("Member2");
781
782         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
783
784         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
785
786         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
787                 .put("default", Arrays.asList("member-1", "member-2")).build());
788
789         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
790                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
791                         new ClusterWrapperImpl(system2)).props().withDispatcher(
792                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
793
794         new JavaTestKit(system1) {
795             {
796                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
797                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
798                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
799                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
800
801                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
802                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
803                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
804                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
805                 shardManager1.tell(
806                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
807                         mockShardActor1);
808                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
809                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
810                 shardManager2.tell(
811                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
812                         mockShardActor2);
813                 shardManager1.underlyingActor().waitForMemberUp();
814
815                 shardManager1.tell(new FindPrimary("default", true), getRef());
816
817                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
818                 String path = found.getPrimaryPath();
819                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
820
821                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
822                         "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
823
824                 shardManager1.underlyingActor().waitForUnreachableMember();
825
826                 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
827                 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
828                 MessageCollectorActor.clearMessages(mockShardActor1);
829
830                 shardManager1.tell(
831                         MockClusterWrapper.createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
832                         getRef());
833
834                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
835
836                 shardManager1.tell(new FindPrimary("default", true), getRef());
837
838                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
839
840                 shardManager1.tell(
841                         MockClusterWrapper.createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
842                         getRef());
843
844                 shardManager1.underlyingActor().waitForReachableMember();
845
846                 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
847                 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
848                 MessageCollectorActor.clearMessages(mockShardActor1);
849
850                 shardManager1.tell(new FindPrimary("default", true), getRef());
851
852                 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
853                 String path1 = found1.getPrimaryPath();
854                 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
855
856                 shardManager1.tell(
857                         MockClusterWrapper.createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
858                         getRef());
859
860                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
861
862                 // Test FindPrimary wait succeeds after reachable member event.
863
864                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
865                         "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
866                 shardManager1.underlyingActor().waitForUnreachableMember();
867
868                 shardManager1.tell(new FindPrimary("default", true), getRef());
869
870                 shardManager1.tell(
871                         MockClusterWrapper.createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
872                         getRef());
873
874                 RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
875                 String path2 = found2.getPrimaryPath();
876                 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
877             }
878         };
879
880         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
881     }
882
883     @Test
884     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
885         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
886         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
887
888         // Create an ActorSystem ShardManager actor for member-1.
889
890         final ActorSystem system1 = newActorSystem("Member1");
891         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
892
893         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
894
895         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
896         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
897                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
898                         .primaryShardInfoCache(primaryShardInfoCache).props()
899                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
900                 shardManagerID);
901
902         // Create an ActorSystem ShardManager actor for member-2.
903
904         final ActorSystem system2 = newActorSystem("Member2");
905
906         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
907
908         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
909
910         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
911                 .put("default", Arrays.asList("member-1", "member-2")).build());
912
913         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
914                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
915                         new ClusterWrapperImpl(system2)).props().withDispatcher(
916                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
917
918         new JavaTestKit(system1) {
919             {
920                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
921                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
922                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
923                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
924
925                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
926                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
927                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
928                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
929                 shardManager1.tell(
930                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
931                         mockShardActor1);
932                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
933                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
934                 shardManager2.tell(
935                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
936                         mockShardActor2);
937                 shardManager1.underlyingActor().waitForMemberUp();
938
939                 shardManager1.tell(new FindPrimary("default", true), getRef());
940
941                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
942                 String path = found.getPrimaryPath();
943                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
944
945                 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
946                         system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
947
948                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
949                         "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
950
951                 shardManager1.underlyingActor().waitForUnreachableMember();
952
953                 shardManager1.tell(new FindPrimary("default", true), getRef());
954
955                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
956
957                 assertNull("Expected primaryShardInfoCache entry removed",
958                         primaryShardInfoCache.getIfPresent("default"));
959
960                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
961                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
962                 shardManager1.tell(
963                         new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
964                         mockShardActor1);
965
966                 shardManager1.tell(new FindPrimary("default", true), getRef());
967
968                 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
969                 String path1 = found1.getPrimaryPath();
970                 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
971
972             }
973         };
974
975         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
976     }
977
978
979     @Test
980     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
981         new JavaTestKit(getSystem()) {
982             {
983                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
984
985                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
986
987                 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
988
989                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
990
991                 assertEquals("getShardName", "non-existent", notFound.getShardName());
992             }
993         };
994     }
995
996     @Test
997     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
998         new JavaTestKit(getSystem()) {
999             {
1000                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1001
1002                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1003                 shardManager.tell(new ActorInitialized(), mockShardActor);
1004
1005                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1006
1007                 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1008
1009                 assertTrue("Found path contains " + found.getPath().path().toString(),
1010                         found.getPath().path().toString().contains("member-1-shard-default-config"));
1011             }
1012         };
1013     }
1014
1015     @Test
1016     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
1017         new JavaTestKit(getSystem()) {
1018             {
1019                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1020
1021                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1022
1023                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1024             }
1025         };
1026     }
1027
1028     @Test
1029     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1030         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1031         new JavaTestKit(getSystem()) {
1032             {
1033                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1034
1035                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1036
1037                 // We're passing waitUntilInitialized = true to FindLocalShard
1038                 // so the response should be
1039                 // delayed until we send ActorInitialized.
1040                 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1041                         new Timeout(5, TimeUnit.SECONDS));
1042
1043                 shardManager.tell(new ActorInitialized(), mockShardActor);
1044
1045                 Object resp = Await.result(future, duration("5 seconds"));
1046                 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1047             }
1048         };
1049
1050         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1051     }
1052
1053     @Test
1054     public void testOnRecoveryJournalIsCleaned() {
1055         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
1056         InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
1057         InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
1058         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
1059
1060         newTestShardManager();
1061
1062         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
1063
1064         // Journal entries up to the last one should've been deleted
1065         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
1066         synchronized (journal) {
1067             assertEquals("Journal size", 0, journal.size());
1068         }
1069     }
1070
1071     @Test
1072     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1073         TestShardManager shardManager = newTestShardManager();
1074
1075         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1076         shardManager.onReceiveCommand(new RoleChangeNotification(
1077                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1078
1079         verify(ready, never()).countDown();
1080
1081         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1082                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1083
1084         verify(ready, times(1)).countDown();
1085     }
1086
1087     @Test
1088     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1089         new JavaTestKit(getSystem()) {
1090             {
1091                 TestShardManager shardManager = newTestShardManager();
1092
1093                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1094                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1095
1096                 verify(ready, never()).countDown();
1097
1098                 shardManager
1099                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1100
1101                 shardManager.onReceiveCommand(
1102                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1103                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1104
1105                 verify(ready, times(1)).countDown();
1106             }
1107         };
1108     }
1109
1110     @Test
1111     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1112         new JavaTestKit(getSystem()) {
1113             {
1114                 TestShardManager shardManager = newTestShardManager();
1115
1116                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1117                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1118
1119                 verify(ready, never()).countDown();
1120
1121                 shardManager.onReceiveCommand(
1122                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1123                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1124
1125                 shardManager
1126                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1127
1128                 verify(ready, times(1)).countDown();
1129             }
1130         };
1131     }
1132
1133     @Test
1134     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1135         TestShardManager shardManager = newTestShardManager();
1136
1137         shardManager.onReceiveCommand(new RoleChangeNotification(
1138                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1139
1140         verify(ready, never()).countDown();
1141     }
1142
1143     @Test
1144     public void testByDefaultSyncStatusIsFalse() throws Exception {
1145         TestShardManager shardManager = newTestShardManager();
1146
1147         assertEquals(false, shardManager.getMBean().getSyncStatus());
1148     }
1149
1150     @Test
1151     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1152         TestShardManager shardManager = newTestShardManager();
1153
1154         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1155                 RaftState.Follower.name(), RaftState.Leader.name()));
1156
1157         assertEquals(true, shardManager.getMBean().getSyncStatus());
1158     }
1159
1160     @Test
1161     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1162         TestShardManager shardManager = newTestShardManager();
1163
1164         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1165         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1166                 RaftState.Follower.name(), RaftState.Candidate.name()));
1167
1168         assertEquals(false, shardManager.getMBean().getSyncStatus());
1169
1170         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1171         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1172                 true, shardId));
1173
1174         assertEquals(false, shardManager.getMBean().getSyncStatus());
1175     }
1176
1177     @Test
1178     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1179         TestShardManager shardManager = newTestShardManager();
1180
1181         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1182         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1183                 RaftState.Candidate.name(), RaftState.Follower.name()));
1184
1185         // Initially will be false
1186         assertEquals(false, shardManager.getMBean().getSyncStatus());
1187
1188         // Send status true will make sync status true
1189         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1190
1191         assertEquals(true, shardManager.getMBean().getSyncStatus());
1192
1193         // Send status false will make sync status false
1194         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1195
1196         assertEquals(false, shardManager.getMBean().getSyncStatus());
1197
1198     }
1199
1200     @Test
1201     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1202         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1203         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1204             @Override
1205             public List<String> getMemberShardNames(MemberName memberName) {
1206                 return Arrays.asList("default", "astronauts");
1207             }
1208         }));
1209
1210         // Initially will be false
1211         assertEquals(false, shardManager.getMBean().getSyncStatus());
1212
1213         // Make default shard leader
1214         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1215         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1216                 RaftState.Follower.name(), RaftState.Leader.name()));
1217
1218         // default = Leader, astronauts is unknown so sync status remains false
1219         assertEquals(false, shardManager.getMBean().getSyncStatus());
1220
1221         // Make astronauts shard leader as well
1222         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1223         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1224                 RaftState.Follower.name(), RaftState.Leader.name()));
1225
1226         // Now sync status should be true
1227         assertEquals(true, shardManager.getMBean().getSyncStatus());
1228
1229         // Make astronauts a Follower
1230         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1231                 RaftState.Leader.name(), RaftState.Follower.name()));
1232
1233         // Sync status is not true
1234         assertEquals(false, shardManager.getMBean().getSyncStatus());
1235
1236         // Make the astronauts follower sync status true
1237         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1238
1239         // Sync status is now true
1240         assertEquals(true, shardManager.getMBean().getSyncStatus());
1241
1242         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1243     }
1244
1245     @Test
1246     public void testOnReceiveSwitchShardBehavior() throws Exception {
1247         new JavaTestKit(getSystem()) {
1248             {
1249                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1250
1251                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1252                 shardManager.tell(new ActorInitialized(), mockShardActor);
1253
1254                 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1255
1256                 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1257                         SwitchBehavior.class);
1258
1259                 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1260                 assertEquals(1000, switchBehavior.getNewTerm());
1261             }
1262         };
1263     }
1264
1265     private static List<MemberName> members(String... names) {
1266         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1267     }
1268
1269     @Test
1270     public void testOnCreateShard() {
1271         LOG.info("testOnCreateShard starting");
1272         new JavaTestKit(getSystem()) {
1273             {
1274                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1275
1276                 ActorRef shardManager = actorFactory
1277                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1278
1279                 SchemaContext schemaContext = TestModel.createTestContext();
1280                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1281
1282                 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1283                         .persistent(false).build();
1284                 Shard.Builder shardBuilder = Shard.builder();
1285
1286                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1287                         "foo", null, members("member-1", "member-5", "member-6"));
1288                 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1289
1290                 expectMsgClass(duration("5 seconds"), Success.class);
1291
1292                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1293
1294                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1295
1296                 assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1297                 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1298                         .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1299                 assertEquals("peerMembers", Sets.newHashSet(
1300                         ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1301                         ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1302                         shardBuilder.getPeerAddresses().keySet());
1303                 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1304                         shardBuilder.getId());
1305                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1306
1307                 // Send CreateShard with same name - should return Success with
1308                 // a message.
1309
1310                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1311
1312                 Success success = expectMsgClass(duration("5 seconds"), Success.class);
1313                 assertNotNull("Success status is null", success.status());
1314             }
1315         };
1316
1317         LOG.info("testOnCreateShard ending");
1318     }
1319
1320     @Test
1321     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1322         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1323         new JavaTestKit(getSystem()) {
1324             {
1325                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1326
1327                 ActorRef shardManager = actorFactory
1328                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1329
1330                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1331
1332                 Shard.Builder shardBuilder = Shard.builder();
1333                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1334                         "foo", null, members("member-5", "member-6"));
1335
1336                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1337                 expectMsgClass(duration("5 seconds"), Success.class);
1338
1339                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1340                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1341
1342                 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1343                 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1344                         .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1345             }
1346         };
1347
1348         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1349     }
1350
1351     @Test
1352     public void testOnCreateShardWithNoInitialSchemaContext() {
1353         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1354         new JavaTestKit(getSystem()) {
1355             {
1356                 ActorRef shardManager = actorFactory
1357                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1358
1359                 Shard.Builder shardBuilder = Shard.builder();
1360
1361                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1362                         "foo", null, members("member-1"));
1363                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1364
1365                 expectMsgClass(duration("5 seconds"), Success.class);
1366
1367                 SchemaContext schemaContext = TestModel.createTestContext();
1368                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1369
1370                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1371
1372                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1373
1374                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1375                 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1376             }
1377         };
1378
1379         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1380     }
1381
1382     @Test
1383     public void testGetSnapshot() throws Exception {
1384         LOG.info("testGetSnapshot starting");
1385         JavaTestKit kit = new JavaTestKit(getSystem());
1386
1387         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1388                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1389                 .put("astronauts", Collections.<String>emptyList()).build());
1390
1391         TestActorRef<TestShardManager> shardManager = actorFactory
1392                 .createTestActor(newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1393
1394         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1395         Failure failure = kit.expectMsgClass(Failure.class);
1396         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1397
1398         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1399
1400         waitForShardInitialized(shardManager, "shard1", kit);
1401         waitForShardInitialized(shardManager, "shard2", kit);
1402
1403         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1404
1405         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1406
1407         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1408         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1409
1410         Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
1411
1412         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1413                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1414
1415         // Add a new replica
1416
1417         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1418
1419         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1420         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1421
1422         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1423         mockShardLeaderKit.expectMsgClass(AddServer.class);
1424         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1425         kit.expectMsgClass(Status.Success.class);
1426         waitForShardInitialized(shardManager, "astronauts", kit);
1427
1428         // Send another GetSnapshot and verify
1429
1430         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1431         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1432
1433         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1434                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1435
1436         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1437         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1438         ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1439         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1440                 Sets.newHashSet(snapshot.getShardList()));
1441
1442         LOG.info("testGetSnapshot ending");
1443     }
1444
1445     @Test
1446     public void testRestoreFromSnapshot() throws Exception {
1447         LOG.info("testRestoreFromSnapshot starting");
1448
1449         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1450
1451         JavaTestKit kit = new JavaTestKit(getSystem());
1452
1453         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1454                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1455                 .put("astronauts", Collections.<String>emptyList()).build());
1456
1457         ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1458         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1459                 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1460         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1461                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1462
1463         shardManager.underlyingActor().waitForRecoveryComplete();
1464
1465         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1466
1467         waitForShardInitialized(shardManager, "shard1", kit);
1468         waitForShardInitialized(shardManager, "shard2", kit);
1469         waitForShardInitialized(shardManager, "astronauts", kit);
1470
1471         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1472
1473         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1474
1475         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1476
1477         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1478         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1479         snapshot = SerializationUtils.deserialize(snapshotBytes);
1480         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1481                 Sets.newHashSet(snapshot.getShardList()));
1482
1483         LOG.info("testRestoreFromSnapshot ending");
1484     }
1485
1486     @Test
1487     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1488         new JavaTestKit(getSystem()) {
1489             {
1490                 ActorRef shardManager = actorFactory
1491                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1492
1493                 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1494                 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1495
1496                 assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
1497             }
1498         };
1499     }
1500
1501     @Test
1502     public void testAddShardReplica() throws Exception {
1503         LOG.info("testAddShardReplica starting");
1504         MockConfiguration mockConfig = new MockConfiguration(
1505                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1506                         .put("astronauts", Arrays.asList("member-2")).build());
1507
1508         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1509         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1510
1511         // Create an ActorSystem ShardManager actor for member-1.
1512         final ActorSystem system1 = newActorSystem("Member1");
1513         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1514         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1515         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1516                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1517                         .cluster(new ClusterWrapperImpl(system1)).props()
1518                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1519                 shardManagerID);
1520
1521         // Create an ActorSystem ShardManager actor for member-2.
1522         final ActorSystem system2 = newActorSystem("Member2");
1523         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1524
1525         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1526         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1527         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1528                 Props.create(MockRespondActor.class, AddServer.class,
1529                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1530                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1531                 name);
1532         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1533                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1534                         .cluster(new ClusterWrapperImpl(system2)).props()
1535                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1536                 shardManagerID);
1537
1538         new JavaTestKit(system1) {
1539             {
1540                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1541                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1542
1543                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1544
1545                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1546                 leaderShardManager.tell(
1547                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1548                         mockShardLeaderActor);
1549                 leaderShardManager.tell(
1550                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1551                         mockShardLeaderActor);
1552
1553                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1554                 leaderShardManager.underlyingActor().waitForMemberUp();
1555
1556                 // Have a dummy snapshot to be overwritten by the new data
1557                 // persisted.
1558                 String[] restoredShards = { "default", "people" };
1559                 ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1560                 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1561                 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1562
1563                 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1564                 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1565
1566                 // construct a mock response message
1567                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1568                 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1569                         AddServer.class);
1570                 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1571                 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1572                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1573
1574                 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1575                 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1576                 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1577                         ShardManagerSnapshot.class);
1578                 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1579                 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1580                 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1581                         Sets.newHashSet(shardManagerSnapshot.getShardList()));
1582             }
1583         };
1584         LOG.info("testAddShardReplica ending");
1585     }
1586
1587     @Test
1588     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1589         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1590         new JavaTestKit(getSystem()) {
1591             {
1592                 TestActorRef<TestShardManager> shardManager = actorFactory
1593                         .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1594
1595                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1596                 shardManager.tell(new ActorInitialized(), mockShardActor);
1597
1598                 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1599                 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1600                 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1601                         .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1602
1603                 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1604
1605                 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1606                 shardManager.tell(
1607                         new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1608                         mockShardActor);
1609                 shardManager.tell(
1610                         new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1611                         mockShardActor);
1612
1613                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1614
1615                 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1616
1617                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1618                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1619
1620                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1621                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1622
1623                 // Send message again to verify previous in progress state is
1624                 // cleared
1625
1626                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1627                 resp = expectMsgClass(duration("5 seconds"), Failure.class);
1628                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1629
1630                 // Send message again with an AddServer timeout to verify the
1631                 // pre-existing shard actor isn't terminated.
1632
1633                 shardManager.tell(
1634                         newDatastoreContextFactory(
1635                                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()),
1636                         getRef());
1637                 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1638                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1639                 expectMsgClass(duration("5 seconds"), Failure.class);
1640
1641                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1642                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1643             }
1644         };
1645
1646         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1647     }
1648
1649     @Test
1650     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1651         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1652         new JavaTestKit(getSystem()) {
1653             {
1654                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1655                 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1656
1657                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1658                 shardManager.tell(new ActorInitialized(), mockShardActor);
1659                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1660                         DataStoreVersions.CURRENT_VERSION), getRef());
1661                 shardManager.tell(
1662                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1663                         mockShardActor);
1664
1665                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1666                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1667                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1668
1669                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1670                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1671             }
1672         };
1673
1674         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1675     }
1676
1677     @Test
1678     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1679         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1680         new JavaTestKit(getSystem()) {
1681             {
1682                 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1683
1684                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1685                         .put("astronauts", Arrays.asList("member-2")).build());
1686
1687                 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1688                 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1689                         newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1690                 shardManager.underlyingActor()
1691                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1692
1693                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1694
1695                 JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1696                 terminateWatcher.watch(mockNewReplicaShardActor);
1697
1698                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1699
1700                 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1701                 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1702                         addServerMsg.getNewServerId());
1703                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1704
1705                 Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1706                 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1707
1708                 shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1709                 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1710
1711                 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1712
1713                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1714                 mockShardLeaderKit.expectMsgClass(AddServer.class);
1715                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1716                 failure = expectMsgClass(duration("5 seconds"), Failure.class);
1717                 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1718             }
1719         };
1720
1721         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1722     }
1723
1724     @Test
1725     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1726         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1727                 AddServer.class, new AddShardReplica("astronauts"));
1728     }
1729
1730     @Test
1731     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1732         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1733         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1734         new JavaTestKit(getSystem()) {
1735             {
1736                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1737                         .put("astronauts", Arrays.asList("member-2")).build());
1738
1739                 final ActorRef newReplicaShardManager = actorFactory
1740                         .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID);
1741
1742                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1743                 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1744                         AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
1745
1746                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1747                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1748                 assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
1749             }
1750         };
1751
1752         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1753     }
1754
1755     @Test
1756     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1757         new JavaTestKit(getSystem()) {
1758             {
1759                 ActorRef shardManager = actorFactory
1760                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1761
1762                 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1763                 Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1764                 assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
1765             }
1766         };
1767     }
1768
1769     @Test
1770     /**
1771      * Primary is Local
1772      */
1773     public void testRemoveShardReplicaLocal() throws Exception {
1774         new JavaTestKit(getSystem()) {
1775             {
1776                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1777
1778                 final TestActorRef<MockRespondActor> respondActor = actorFactory
1779                         .createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
1780                                 new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1781
1782                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1783
1784                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1785                 shardManager.tell(new ActorInitialized(), respondActor);
1786                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1787                         DataStoreVersions.CURRENT_VERSION), getRef());
1788                 shardManager.tell(
1789                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1790                         respondActor);
1791
1792                 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1793                 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1794                         RemoveServer.class);
1795                 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1796                         removeServer.getServerId());
1797                 expectMsgClass(duration("5 seconds"), Success.class);
1798             }
1799         };
1800     }
1801
1802     @Test
1803     public void testRemoveShardReplicaRemote() throws Exception {
1804         MockConfiguration mockConfig = new MockConfiguration(
1805                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1806                         .put("astronauts", Arrays.asList("member-1")).build());
1807
1808         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1809
1810         // Create an ActorSystem ShardManager actor for member-1.
1811         final ActorSystem system1 = newActorSystem("Member1");
1812         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1813         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1814
1815         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1816                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1817                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1818                 shardManagerID);
1819
1820         // Create an ActorSystem ShardManager actor for member-2.
1821         final ActorSystem system2 = newActorSystem("Member2");
1822         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1823
1824         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1825         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1826         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1827                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1828                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1829
1830         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1831
1832         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1833                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1834                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1835                 shardManagerID);
1836
1837         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1838         //    akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1839         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1840         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1841         // look like so,
1842         //    akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1843         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1844         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1845         // dead letters.
1846         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1847         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1848         // thing works as expected
1849         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1850                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1851                         "member-2-shard-default-" + shardMrgIDSuffix);
1852
1853         LOG.error("Forwarding actor : {}", actorRef);
1854
1855         new JavaTestKit(system1) {
1856             {
1857                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1858                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1859
1860                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1861                 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1862
1863                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1864                 leaderShardManager.tell(
1865                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1866                         mockShardLeaderActor);
1867                 leaderShardManager.tell(
1868                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1869                         mockShardLeaderActor);
1870
1871                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1872                 newReplicaShardManager.tell(
1873                         new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1874                         mockShardActor);
1875                 newReplicaShardManager.tell(
1876                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1877                         mockShardActor);
1878
1879                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1880                 leaderShardManager.underlyingActor().waitForMemberUp();
1881
1882                 // construct a mock response message
1883                 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1884                 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1885                         RemoveServer.class);
1886                 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1887                 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1888                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1889             }
1890         };
1891     }
1892
1893     @Test
1894     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1895         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1896                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1897     }
1898
1899     @Test
1900     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1901         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1902                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1903     }
1904
1905
1906     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1907                                                       final Class<?> firstForwardedServerChangeClass,
1908                                                       final Object secondServerChange) throws Exception {
1909         new JavaTestKit(getSystem()) {
1910             {
1911                 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1912                 final JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1913
1914                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1915                         .put(shardName, Arrays.asList("member-2")).build());
1916
1917                 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1918                         newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1919                                 .cluster(new MockClusterWrapper()).props()
1920                                 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1921                         shardMgrID);
1922
1923                 shardManager.underlyingActor()
1924                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1925
1926                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1927
1928                 shardManager.tell(firstServerChange, getRef());
1929
1930                 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1931
1932                 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1933
1934                 secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1935             }
1936         };
1937     }
1938
1939     @Test
1940     public void testServerRemovedShardActorNotRunning() throws Exception {
1941         LOG.info("testServerRemovedShardActorNotRunning starting");
1942         new JavaTestKit(getSystem()) {
1943             {
1944                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1945                         .put("default", Arrays.asList("member-1", "member-2"))
1946                         .put("astronauts", Arrays.asList("member-2"))
1947                         .put("people", Arrays.asList("member-1", "member-2")).build());
1948
1949                 TestActorRef<TestShardManager> shardManager = actorFactory
1950                         .createTestActor(newShardMgrProps(mockConfig));
1951
1952                 shardManager.underlyingActor().waitForRecoveryComplete();
1953                 shardManager.tell(new FindLocalShard("people", false), getRef());
1954                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1955
1956                 shardManager.tell(new FindLocalShard("default", false), getRef());
1957                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1958
1959                 // Removed the default shard replica from member-1
1960                 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1961                 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1962                         .build();
1963                 shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1964
1965                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1966             }
1967         };
1968
1969         LOG.info("testServerRemovedShardActorNotRunning ending");
1970     }
1971
1972     @Test
1973     public void testServerRemovedShardActorRunning() throws Exception {
1974         LOG.info("testServerRemovedShardActorRunning starting");
1975         new JavaTestKit(getSystem()) {
1976             {
1977                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1978                         .put("default", Arrays.asList("member-1", "member-2"))
1979                         .put("astronauts", Arrays.asList("member-2"))
1980                         .put("people", Arrays.asList("member-1", "member-2")).build());
1981
1982                 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1983                 TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(MessageCollectorActor.props(),
1984                         shardId);
1985
1986                 TestActorRef<TestShardManager> shardManager = actorFactory
1987                         .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1988
1989                 shardManager.underlyingActor().waitForRecoveryComplete();
1990
1991                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1992                 shardManager.tell(new ActorInitialized(), shard);
1993
1994                 waitForShardInitialized(shardManager, "people", this);
1995                 waitForShardInitialized(shardManager, "default", this);
1996
1997                 // Removed the default shard replica from member-1
1998                 shardManager.tell(new ServerRemoved(shardId), getRef());
1999
2000                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
2001
2002                 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
2003             }
2004         };
2005
2006         LOG.info("testServerRemovedShardActorRunning ending");
2007     }
2008
2009     @Test
2010     public void testShardPersistenceWithRestoredData() throws Exception {
2011         LOG.info("testShardPersistenceWithRestoredData starting");
2012         new JavaTestKit(getSystem()) {
2013             {
2014                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2015                         .put("default", Arrays.asList("member-1", "member-2"))
2016                         .put("astronauts", Arrays.asList("member-2"))
2017                         .put("people", Arrays.asList("member-1", "member-2")).build());
2018                 String[] restoredShards = { "default", "astronauts" };
2019                 ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
2020                 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
2021
2022                 // create shardManager to come up with restored data
2023                 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory
2024                         .createTestActor(newShardMgrProps(mockConfig));
2025
2026                 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
2027
2028                 newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
2029                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
2030                 assertEquals("for uninitialized shard", "people", notFound.getShardName());
2031
2032                 // Verify a local shard is created for the restored shards,
2033                 // although we expect a NotInitializedException for the shards
2034                 // as the actor initialization
2035                 // message is not sent for them
2036                 newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
2037                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
2038
2039                 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
2040                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
2041             }
2042         };
2043
2044         LOG.info("testShardPersistenceWithRestoredData ending");
2045     }
2046
2047     @Test
2048     public void testShutDown() throws Exception {
2049         LOG.info("testShutDown starting");
2050         new JavaTestKit(getSystem()) {
2051             {
2052                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2053                         .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2054
2055                 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2056                 TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
2057                         MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
2058
2059                 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2060                 TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
2061                         MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
2062
2063                 TestActorRef<TestShardManager> shardManager = actorFactory
2064                         .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("shard1", shard1)
2065                                 .addShardActor("shard2", shard2).props()
2066                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2067
2068                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2069                 shardManager.tell(new ActorInitialized(), shard1);
2070                 shardManager.tell(new ActorInitialized(), shard2);
2071
2072                 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2073                 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2074
2075                 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2076                 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2077
2078                 try {
2079                     Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2080                     fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2081                 } catch (TimeoutException e) {
2082                     // expected
2083                 }
2084
2085                 actorFactory.killActor(shard1, this);
2086                 actorFactory.killActor(shard2, this);
2087
2088                 Boolean stopped = Await.result(stopFuture, duration);
2089                 assertEquals("Stopped", Boolean.TRUE, stopped);
2090             }
2091         };
2092
2093         LOG.info("testShutDown ending");
2094     }
2095
2096     @Test
2097     public void testChangeServersVotingStatus() throws Exception {
2098         new JavaTestKit(getSystem()) {
2099             {
2100                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2101
2102                 TestActorRef<MockRespondActor> respondActor = actorFactory
2103                         .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2104                                 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2105
2106                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2107
2108                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2109                 shardManager.tell(new ActorInitialized(), respondActor);
2110                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2111                         DataStoreVersions.CURRENT_VERSION), getRef());
2112                 shardManager.tell(
2113                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2114                         respondActor);
2115
2116                 shardManager.tell(
2117                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2118                         getRef());
2119
2120                 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2121                         .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2122                 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2123                         ImmutableMap.of(ShardIdentifier
2124                                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2125                                 Boolean.TRUE));
2126
2127                 expectMsgClass(duration("5 seconds"), Success.class);
2128             }
2129         };
2130     }
2131
2132     @Test
2133     public void testChangeServersVotingStatusWithNoLeader() throws Exception {
2134         new JavaTestKit(getSystem()) {
2135             {
2136                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2137
2138                 TestActorRef<MockRespondActor> respondActor = actorFactory
2139                         .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2140                                 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2141
2142                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2143
2144                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2145                 shardManager.tell(new ActorInitialized(), respondActor);
2146                 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2147
2148                 shardManager.tell(
2149                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2150                         getRef());
2151
2152                 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2153
2154                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
2155                 assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
2156             }
2157         };
2158     }
2159
2160     private static class TestShardManager extends ShardManager {
2161         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2162         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2163         private ShardManagerSnapshot snapshot;
2164         private final Map<String, ActorRef> shardActors;
2165         private final ActorRef shardActor;
2166         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2167         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2168         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2169         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2170         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2171         private volatile MessageInterceptor messageInterceptor;
2172
2173         private TestShardManager(Builder builder) {
2174             super(builder);
2175             shardActor = builder.shardActor;
2176             shardActors = builder.shardActors;
2177         }
2178
2179         @Override
2180         protected void handleRecover(Object message) throws Exception {
2181             try {
2182                 super.handleRecover(message);
2183             } finally {
2184                 if (message instanceof RecoveryCompleted) {
2185                     recoveryComplete.countDown();
2186                 }
2187             }
2188         }
2189
2190         private void countDownIfOther(final Member member, CountDownLatch latch) {
2191             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2192                 latch.countDown();
2193             }
2194         }
2195
2196         @Override
2197         public void handleCommand(Object message) throws Exception {
2198             try {
2199                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2200                     getSender().tell(messageInterceptor.apply(message), getSelf());
2201                 } else {
2202                     super.handleCommand(message);
2203                 }
2204             } finally {
2205                 if (message instanceof FindPrimary) {
2206                     findPrimaryMessageReceived.countDown();
2207                 } else if (message instanceof ClusterEvent.MemberUp) {
2208                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2209                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2210                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2211                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2212                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2213                 } else if (message instanceof ClusterEvent.ReachableMember) {
2214                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2215                 }
2216             }
2217         }
2218
2219         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2220             this.messageInterceptor = messageInterceptor;
2221         }
2222
2223         void waitForRecoveryComplete() {
2224             assertEquals("Recovery complete", true,
2225                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2226         }
2227
2228         void waitForMemberUp() {
2229             assertEquals("MemberUp received", true,
2230                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2231             memberUpReceived = new CountDownLatch(1);
2232         }
2233
2234         void waitForMemberRemoved() {
2235             assertEquals("MemberRemoved received", true,
2236                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2237             memberRemovedReceived = new CountDownLatch(1);
2238         }
2239
2240         void waitForUnreachableMember() {
2241             assertEquals("UnreachableMember received", true,
2242                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2243                 ));
2244             memberUnreachableReceived = new CountDownLatch(1);
2245         }
2246
2247         void waitForReachableMember() {
2248             assertEquals("ReachableMember received", true,
2249                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2250             memberReachableReceived = new CountDownLatch(1);
2251         }
2252
2253         void verifyFindPrimary() {
2254             assertEquals("FindPrimary received", true,
2255                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2256             findPrimaryMessageReceived = new CountDownLatch(1);
2257         }
2258
2259         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2260             return new Builder(datastoreContextBuilder);
2261         }
2262
2263         private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2264             private ActorRef shardActor;
2265             private final Map<String, ActorRef> shardActors = new HashMap<>();
2266
2267             Builder(DatastoreContext.Builder datastoreContextBuilder) {
2268                 super(TestShardManager.class);
2269                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2270             }
2271
2272             Builder shardActor(ActorRef newShardActor) {
2273                 this.shardActor = newShardActor;
2274                 return this;
2275             }
2276
2277             Builder addShardActor(String shardName, ActorRef actorRef) {
2278                 shardActors.put(shardName, actorRef);
2279                 return this;
2280             }
2281         }
2282
2283         @Override
2284         public void saveSnapshot(Object obj) {
2285             snapshot = (ShardManagerSnapshot) obj;
2286             snapshotPersist.countDown();
2287             super.saveSnapshot(obj);
2288         }
2289
2290         void verifySnapshotPersisted(Set<String> shardList) {
2291             assertEquals("saveSnapshot invoked", true,
2292                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2293             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2294         }
2295
2296         @Override
2297         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2298             if (shardActors.get(info.getShardName()) != null) {
2299                 return shardActors.get(info.getShardName());
2300             }
2301
2302             if (shardActor != null) {
2303                 return shardActor;
2304             }
2305
2306             return super.newShardActor(schemaContext, info);
2307         }
2308     }
2309
2310     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2311                                                      extends AbstractShardManagerCreator<T> {
2312         private final Class<C> shardManagerClass;
2313
2314         AbstractGenericCreator(Class<C> shardManagerClass) {
2315             this.shardManagerClass = shardManagerClass;
2316             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountdownLatch(ready)
2317                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2318         }
2319
2320         @Override
2321         public Props props() {
2322             verify();
2323             return Props.create(shardManagerClass, this);
2324         }
2325     }
2326
2327     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2328         GenericCreator(Class<C> shardManagerClass) {
2329             super(shardManagerClass);
2330         }
2331     }
2332
2333     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2334         private static final long serialVersionUID = 1L;
2335         private final Creator<ShardManager> delegate;
2336
2337         DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2338             this.delegate = delegate;
2339         }
2340
2341         @Override
2342         public ShardManager create() throws Exception {
2343             return delegate.create();
2344         }
2345     }
2346
2347     interface MessageInterceptor extends Function<Object, Object> {
2348         boolean canIntercept(Object message);
2349     }
2350
2351     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2352         return new MessageInterceptor() {
2353             @Override
2354             public Object apply(Object message) {
2355                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2356             }
2357
2358             @Override
2359             public boolean canIntercept(Object message) {
2360                 return message instanceof FindPrimary;
2361             }
2362         };
2363     }
2364
2365     private static class MockRespondActor extends MessageCollectorActor {
2366         static final String CLEAR_RESPONSE = "clear-response";
2367
2368         private Object responseMsg;
2369         private final Class<?> requestClass;
2370
2371         @SuppressWarnings("unused")
2372         MockRespondActor(Class<?> requestClass, Object responseMsg) {
2373             this.requestClass = requestClass;
2374             this.responseMsg = responseMsg;
2375         }
2376
2377         @Override
2378         public void onReceive(Object message) throws Exception {
2379             if (message.equals(CLEAR_RESPONSE)) {
2380                 responseMsg = null;
2381             } else {
2382                 super.onReceive(message);
2383                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2384                     getSender().tell(responseMsg, getSelf());
2385                 }
2386             }
2387         }
2388     }
2389 }