Fix raw reference
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import akka.actor.AddressFromURIString;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.dispatch.Dispatchers;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.RecoveryCompleted;
34 import akka.serialization.Serialization;
35 import akka.testkit.JavaTestKit;
36 import akka.testkit.TestActorRef;
37 import akka.util.Timeout;
38 import com.google.common.base.Function;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Stopwatch;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.typesafe.config.ConfigFactory;
47 import java.net.URI;
48 import java.util.AbstractMap;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Map.Entry;
57 import java.util.Set;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import org.apache.commons.lang3.SerializationUtils;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.mockito.Mock;
66 import org.mockito.Mockito;
67 import org.mockito.MockitoAnnotations;
68 import org.opendaylight.controller.cluster.datastore.config.Configuration;
69 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
70 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
71 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
72 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
73 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
74 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
75 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
76 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
77 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
78 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
79 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
81 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
82 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
83 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
84 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
85 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
86 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
87 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
88 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
89 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
90 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
91 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
93 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
94 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
95 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
96 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
97 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
98 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
99 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
100 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
101 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
102 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
103 import org.opendaylight.controller.cluster.raft.RaftState;
104 import org.opendaylight.controller.cluster.raft.TestActorFactory;
105 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
106 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
107 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
108 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
109 import org.opendaylight.controller.cluster.raft.messages.AddServer;
110 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
111 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
112 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
113 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
114 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
115 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
116 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
117 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
118 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
119 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
121 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
122 import org.slf4j.Logger;
123 import org.slf4j.LoggerFactory;
124 import scala.concurrent.Await;
125 import scala.concurrent.Future;
126 import scala.concurrent.duration.FiniteDuration;
127
128 public class ShardManagerTest extends AbstractActorTest {
129     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
130
131     private static int ID_COUNTER = 1;
132
133     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
134     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
135
136     @Mock
137     private static CountDownLatch ready;
138
139     private static TestActorRef<MessageCollectorActor> mockShardActor;
140
141     private static String mockShardName;
142
143     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
144             dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
145                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
146
147     private final Collection<ActorSystem> actorSystems = new ArrayList<>();
148
149     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
150
151     @Before
152     public void setUp() {
153         MockitoAnnotations.initMocks(this);
154
155         InMemoryJournal.clear();
156         InMemorySnapshotStore.clear();
157
158         if(mockShardActor == null) {
159             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
160             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
161         }
162
163         mockShardActor.underlyingActor().clear();
164     }
165
166     @After
167     public void tearDown() {
168         InMemoryJournal.clear();
169         InMemorySnapshotStore.clear();
170
171         for(ActorSystem system: actorSystems) {
172             JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
173         }
174
175         actorFactory.close();
176     }
177
178     private ActorSystem newActorSystem(String config) {
179         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
180         actorSystems.add(system);
181         return system;
182     }
183
184     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
185         String name = new ShardIdentifier(shardName, memberName,"config").toString();
186         if(system == getSystem()) {
187             return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
188         }
189
190         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
191     }
192
193     private Props newShardMgrProps() {
194         return newShardMgrProps(new MockConfiguration());
195     }
196
197     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
198         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
199         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
200         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
201         return mockFactory;
202     }
203
204     private TestShardManager.Builder newTestShardMgrBuilder() {
205         return TestShardManager.builder(datastoreContextBuilder);
206     }
207
208     private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
209         return TestShardManager.builder(datastoreContextBuilder).configuration(config);
210     }
211
212     private Props newShardMgrProps(Configuration config) {
213         return newTestShardMgrBuilder(config).props();
214     }
215
216     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
217         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
218     }
219
220     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
221         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
222     }
223
224
225     private Props newPropsShardMgrWithMockShardActor() {
226         return newTestShardMgrBuilderWithMockShardActor().props();
227     }
228
229     private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
230         return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
231     }
232
233
234     private TestShardManager newTestShardManager() {
235         return newTestShardManager(newShardMgrProps());
236     }
237
238     private TestShardManager newTestShardManager(Props props) {
239         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
240         TestShardManager shardManager = shardManagerActor.underlyingActor();
241         shardManager.waitForRecoveryComplete();
242         return shardManager;
243     }
244
245     private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
246         AssertionError last = null;
247         Stopwatch sw = Stopwatch.createStarted();
248         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
249             try {
250                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
251                 kit.expectMsgClass(LocalShardFound.class);
252                 return;
253             } catch(AssertionError e) {
254                 last = e;
255             }
256
257             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
258         }
259
260         throw last;
261     }
262
263     private <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
264         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
265         if(reply instanceof Failure) {
266             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
267         }
268
269         return (T)reply;
270     }
271
272     @Test
273     public void testPerShardDatastoreContext() throws Exception {
274         LOG.info("testPerShardDatastoreContext starting");
275         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
276                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
277
278         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
279                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
280
281         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
282                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
283
284         final MockConfiguration mockConfig = new MockConfiguration() {
285             @Override
286             public Collection<String> getMemberShardNames(String memberName) {
287                 return Arrays.asList("default", "topology");
288             }
289
290             @Override
291             public Collection<String> getMembersFromShardName(String shardName) {
292                 return Arrays.asList("member-1");
293             }
294         };
295
296         final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
297                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
298         final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
299                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
300
301         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
302                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
303         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
304         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
305
306         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
307         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
308         class LocalShardManager extends ShardManager {
309             public LocalShardManager(AbstractBuilder<?> builder) {
310                 super(builder);
311             }
312
313             @Override
314             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
315                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
316                 ActorRef ref = null;
317                 if(entry != null) {
318                     ref = entry.getKey();
319                     entry.setValue(info.getDatastoreContext());
320                 }
321
322                 newShardActorLatch.countDown();
323                 return ref;
324             }
325         }
326
327         final Creator<ShardManager> creator = new Creator<ShardManager>() {
328             private static final long serialVersionUID = 1L;
329             @Override
330             public ShardManager create() throws Exception {
331                 return new LocalShardManager(new GenericBuilder<LocalShardManager>(LocalShardManager.class).
332                         datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
333                         configuration(mockConfig));
334             }
335         };
336
337         JavaTestKit kit = new JavaTestKit(getSystem());
338
339         final ActorRef shardManager = actorFactory.createActor(Props.create(
340                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
341
342         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
343
344         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
345         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
346                 getShardElectionTimeoutFactor());
347         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
348                 getShardElectionTimeoutFactor());
349
350         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
351                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
352         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
353                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
354
355         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
356                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
357
358         shardManager.tell(newMockFactory, kit.getRef());
359
360         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
361         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
362
363         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
364         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
365
366         LOG.info("testPerShardDatastoreContext ending");
367     }
368
369     @Test
370     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
371         new JavaTestKit(getSystem()) {{
372             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
373
374             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
375
376             shardManager.tell(new FindPrimary("non-existent", false), getRef());
377
378             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
379         }};
380     }
381
382     @Test
383     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
384         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
385         new JavaTestKit(getSystem()) {{
386             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
387
388             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
389
390             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
391             shardManager.tell(new ActorInitialized(), mockShardActor);
392
393             DataTree mockDataTree = mock(DataTree.class);
394             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
395                     DataStoreVersions.CURRENT_VERSION), getRef());
396
397             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
398             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
399                     RaftState.Leader.name())), mockShardActor);
400
401             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
402
403             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
404             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
405                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
406             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
407         }};
408
409         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
410     }
411
412     @Test
413     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
414         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
415         new JavaTestKit(getSystem()) {{
416             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
417
418             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
419             shardManager.tell(new ActorInitialized(), mockShardActor);
420
421             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
422             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
423             shardManager.tell(new RoleChangeNotification(memberId1,
424                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
425             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
426
427             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
428
429             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
430         }};
431
432         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
433     }
434
435     @Test
436     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
437         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
438         new JavaTestKit(getSystem()) {{
439             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
440
441             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
442             shardManager.tell(new ActorInitialized(), mockShardActor);
443
444             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
445             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
446
447             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
448             shardManager.tell(new RoleChangeNotification(memberId1,
449                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
450             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
451             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
452                     leaderVersion), mockShardActor);
453
454             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
455
456             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
457             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
458                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
459             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
460         }};
461
462         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
463     }
464
465     @Test
466     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
467         new JavaTestKit(getSystem()) {{
468             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
469
470             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
471
472             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
473         }};
474     }
475
476     @Test
477     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
478         new JavaTestKit(getSystem()) {{
479             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
480
481             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
482             shardManager.tell(new ActorInitialized(), mockShardActor);
483
484             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
485
486             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
487         }};
488     }
489
490     @Test
491     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
492         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
493         new JavaTestKit(getSystem()) {{
494             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
495
496             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
497             shardManager.tell(new ActorInitialized(), mockShardActor);
498
499             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
500             shardManager.tell(new RoleChangeNotification(memberId,
501                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
502
503             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
504
505             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
506
507             DataTree mockDataTree = mock(DataTree.class);
508             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
509                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
510
511             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
512
513             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
514             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
515                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
516             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
517         }};
518
519         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
520     }
521
522     @Test
523     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
524         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
525         new JavaTestKit(getSystem()) {{
526             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
527
528             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
529
530             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
531             // delayed until we send ActorInitialized and RoleChangeNotification.
532             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
533
534             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
535
536             shardManager.tell(new ActorInitialized(), mockShardActor);
537
538             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
539
540             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
541             shardManager.tell(new RoleChangeNotification(memberId,
542                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
543
544             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
545
546             DataTree mockDataTree = mock(DataTree.class);
547             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
548                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
549
550             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
551             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
552                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
553             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
554
555             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
556         }};
557
558         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
559     }
560
561     @Test
562     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
563         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
564         new JavaTestKit(getSystem()) {{
565             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
566
567             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
568
569             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
570
571             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
572
573             shardManager.tell(new ActorInitialized(), mockShardActor);
574
575             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
576         }};
577
578         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
579     }
580
581     @Test
582     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
583         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
584         new JavaTestKit(getSystem()) {{
585             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
586
587             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
588             shardManager.tell(new ActorInitialized(), mockShardActor);
589             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
590                     null, RaftState.Candidate.name()), mockShardActor);
591
592             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
593
594             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
595         }};
596
597         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
598     }
599
600     @Test
601     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
602         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
603         new JavaTestKit(getSystem()) {{
604             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
605
606             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
607             shardManager.tell(new ActorInitialized(), mockShardActor);
608             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
609                     null, RaftState.IsolatedLeader.name()), mockShardActor);
610
611             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
612
613             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
614         }};
615
616         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
617     }
618
619     @Test
620     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
621         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
622         new JavaTestKit(getSystem()) {{
623             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
624
625             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
626             shardManager.tell(new ActorInitialized(), mockShardActor);
627
628             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
629
630             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
631         }};
632
633         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
634     }
635
636     @Test
637     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
638         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
639         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
640
641         // Create an ActorSystem ShardManager actor for member-1.
642
643         final ActorSystem system1 = newActorSystem("Member1");
644         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
645
646         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
647                 newTestShardMgrBuilderWithMockShardActor().cluster(
648                         new ClusterWrapperImpl(system1)).props(), shardManagerID);
649
650         // Create an ActorSystem ShardManager actor for member-2.
651
652         final ActorSystem system2 = newActorSystem("Member2");
653
654         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
655
656         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
657
658         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
659                 put("default", Arrays.asList("member-1", "member-2")).
660                 put("astronauts", Arrays.asList("member-2")).build());
661
662         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
663                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
664                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
665
666         new JavaTestKit(system1) {{
667
668             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
669             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
670
671             shardManager2.tell(new ActorInitialized(), mockShardActor2);
672
673             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
674             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
675             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
676                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
677             shardManager2.tell(new RoleChangeNotification(memberId2,
678                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
679
680             shardManager1.underlyingActor().waitForMemberUp();
681
682             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
683
684             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
685             String path = found.getPrimaryPath();
686             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
687             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
688
689             shardManager2.underlyingActor().verifyFindPrimary();
690
691             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
692
693             shardManager1.underlyingActor().waitForMemberRemoved();
694
695             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
696
697             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
698         }};
699
700         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
701     }
702
703     @Test
704     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
705         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
706         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
707
708         // Create an ActorSystem ShardManager actor for member-1.
709
710         final ActorSystem system1 = newActorSystem("Member1");
711         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
712
713         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
714
715         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
716                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
717                         new ClusterWrapperImpl(system1)).props(), shardManagerID);
718
719         // Create an ActorSystem ShardManager actor for member-2.
720
721         final ActorSystem system2 = newActorSystem("Member2");
722
723         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
724
725         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
726
727         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
728             put("default", Arrays.asList("member-1", "member-2")).build());
729
730         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
731                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
732                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
733
734         new JavaTestKit(system1) {{
735
736             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
737             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
738             shardManager1.tell(new ActorInitialized(), mockShardActor1);
739             shardManager2.tell(new ActorInitialized(), mockShardActor2);
740
741             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
742             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
743             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
744                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
745             shardManager1.tell(new RoleChangeNotification(memberId1,
746                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
747             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
748                     DataStoreVersions.CURRENT_VERSION),
749                 mockShardActor2);
750             shardManager2.tell(new RoleChangeNotification(memberId2,
751                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
752             shardManager1.underlyingActor().waitForMemberUp();
753
754             shardManager1.tell(new FindPrimary("default", true), getRef());
755
756             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
757             String path = found.getPrimaryPath();
758             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
759
760             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
761                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
762
763             shardManager1.underlyingActor().waitForUnreachableMember();
764
765             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
766             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
767             MessageCollectorActor.clearMessages(mockShardActor1);
768
769             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
770                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
771
772             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
773
774             shardManager1.tell(new FindPrimary("default", true), getRef());
775
776             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
777
778             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
779                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
780
781             shardManager1.underlyingActor().waitForReachableMember();
782
783             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
784             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
785             MessageCollectorActor.clearMessages(mockShardActor1);
786
787             shardManager1.tell(new FindPrimary("default", true), getRef());
788
789             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
790             String path1 = found1.getPrimaryPath();
791             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
792
793             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
794                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
795
796             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
797
798             // Test FindPrimary wait succeeds after reachable member event.
799
800             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
801                     createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
802             shardManager1.underlyingActor().waitForUnreachableMember();
803
804             shardManager1.tell(new FindPrimary("default", true), getRef());
805
806             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
807                     createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
808
809             RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
810             String path2 = found2.getPrimaryPath();
811             assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
812         }};
813
814         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
815     }
816
817     @Test
818     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
819         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
820         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
821
822         // Create an ActorSystem ShardManager actor for member-1.
823
824         final ActorSystem system1 = newActorSystem("Member1");
825         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
826
827         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
828
829         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
830         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
831                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
832                         new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(),
833                 shardManagerID);
834
835         // Create an ActorSystem ShardManager actor for member-2.
836
837         final ActorSystem system2 = newActorSystem("Member2");
838
839         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
840
841         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
842
843         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
844             put("default", Arrays.asList("member-1", "member-2")).build());
845
846         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
847                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
848                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
849
850         new JavaTestKit(system1) {{
851             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
852             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
853             shardManager1.tell(new ActorInitialized(), mockShardActor1);
854             shardManager2.tell(new ActorInitialized(), mockShardActor2);
855
856             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
857             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
858             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
859                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
860             shardManager1.tell(new RoleChangeNotification(memberId1,
861                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
862             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
863                     DataStoreVersions.CURRENT_VERSION),
864                 mockShardActor2);
865             shardManager2.tell(new RoleChangeNotification(memberId2,
866                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
867             shardManager1.underlyingActor().waitForMemberUp();
868
869             shardManager1.tell(new FindPrimary("default", true), getRef());
870
871             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
872             String path = found.getPrimaryPath();
873             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
874
875             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
876                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
877
878             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
879                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
880
881             shardManager1.underlyingActor().waitForUnreachableMember();
882
883             shardManager1.tell(new FindPrimary("default", true), getRef());
884
885             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
886
887             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
888
889             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
890                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
891             shardManager1.tell(new RoleChangeNotification(memberId1,
892                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
893
894             shardManager1.tell(new FindPrimary("default", true), getRef());
895
896             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
897             String path1 = found1.getPrimaryPath();
898             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
899
900         }};
901
902         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
903     }
904
905
906     @Test
907     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
908         new JavaTestKit(getSystem()) {{
909             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
910
911             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
912
913             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
914
915             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
916
917             assertEquals("getShardName", "non-existent", notFound.getShardName());
918         }};
919     }
920
921     @Test
922     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
923         new JavaTestKit(getSystem()) {{
924             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
925
926             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
927             shardManager.tell(new ActorInitialized(), mockShardActor);
928
929             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
930
931             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
932
933             assertTrue("Found path contains " + found.getPath().path().toString(),
934                     found.getPath().path().toString().contains("member-1-shard-default-config"));
935         }};
936     }
937
938     @Test
939     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
940         new JavaTestKit(getSystem()) {{
941             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
942
943             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
944
945             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
946         }};
947     }
948
949     @Test
950     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
951         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
952         new JavaTestKit(getSystem()) {{
953             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
954
955             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
956
957             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
958             // delayed until we send ActorInitialized.
959             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
960                     new Timeout(5, TimeUnit.SECONDS));
961
962             shardManager.tell(new ActorInitialized(), mockShardActor);
963
964             Object resp = Await.result(future, duration("5 seconds"));
965             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
966         }};
967
968         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
969     }
970
971     @Test
972     public void testOnRecoveryJournalIsCleaned() {
973         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
974         InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
975                 ImmutableSet.of("foo")));
976         InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
977                 ImmutableSet.of("bar")));
978         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
979
980         TestShardManager shardManager = newTestShardManager();
981
982         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
983
984         // Journal entries up to the last one should've been deleted
985         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
986         synchronized (journal) {
987             assertEquals("Journal size", 0, journal.size());
988         }
989     }
990
991     @Test
992     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
993         TestShardManager shardManager = newTestShardManager();
994
995         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
996         shardManager.onReceiveCommand(new RoleChangeNotification(
997                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
998
999         verify(ready, never()).countDown();
1000
1001         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1002                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
1003
1004         verify(ready, times(1)).countDown();
1005     }
1006
1007     @Test
1008     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1009         new JavaTestKit(getSystem()) {{
1010             TestShardManager shardManager = newTestShardManager();
1011
1012             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1013             shardManager.onReceiveCommand(new RoleChangeNotification(
1014                     memberId, null, RaftState.Follower.name()));
1015
1016             verify(ready, never()).countDown();
1017
1018             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1019
1020             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1021                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
1022                     DataStoreVersions.CURRENT_VERSION));
1023
1024             verify(ready, times(1)).countDown();
1025         }};
1026     }
1027
1028     @Test
1029     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1030         new JavaTestKit(getSystem()) {{
1031             TestShardManager shardManager = newTestShardManager();
1032
1033             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1034             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1035
1036             verify(ready, never()).countDown();
1037
1038             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1039                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
1040                     DataStoreVersions.CURRENT_VERSION));
1041
1042             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1043
1044             verify(ready, times(1)).countDown();
1045         }};
1046     }
1047
1048     @Test
1049     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1050         TestShardManager shardManager = newTestShardManager();
1051
1052         shardManager.onReceiveCommand(new RoleChangeNotification(
1053                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1054
1055         verify(ready, never()).countDown();
1056     }
1057
1058     @Test
1059     public void testByDefaultSyncStatusIsFalse() throws Exception{
1060         TestShardManager shardManager = newTestShardManager();
1061
1062         assertEquals(false, shardManager.getMBean().getSyncStatus());
1063     }
1064
1065     @Test
1066     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1067         TestShardManager shardManager = newTestShardManager();
1068
1069         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1070                 RaftState.Follower.name(), RaftState.Leader.name()));
1071
1072         assertEquals(true, shardManager.getMBean().getSyncStatus());
1073     }
1074
1075     @Test
1076     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1077         TestShardManager shardManager = newTestShardManager();
1078
1079         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1080         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1081                 RaftState.Follower.name(), RaftState.Candidate.name()));
1082
1083         assertEquals(false, shardManager.getMBean().getSyncStatus());
1084
1085         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1086         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1087                 true, shardId));
1088
1089         assertEquals(false, shardManager.getMBean().getSyncStatus());
1090     }
1091
1092     @Test
1093     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1094         TestShardManager shardManager = newTestShardManager();
1095
1096         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1097         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1098                 RaftState.Candidate.name(), RaftState.Follower.name()));
1099
1100         // Initially will be false
1101         assertEquals(false, shardManager.getMBean().getSyncStatus());
1102
1103         // Send status true will make sync status true
1104         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1105
1106         assertEquals(true, shardManager.getMBean().getSyncStatus());
1107
1108         // Send status false will make sync status false
1109         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1110
1111         assertEquals(false, shardManager.getMBean().getSyncStatus());
1112
1113     }
1114
1115     @Test
1116     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1117         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1118         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1119             @Override
1120             public List<String> getMemberShardNames(String memberName) {
1121                 return Arrays.asList("default", "astronauts");
1122             }
1123         }));
1124
1125         // Initially will be false
1126         assertEquals(false, shardManager.getMBean().getSyncStatus());
1127
1128         // Make default shard leader
1129         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1130         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1131                 RaftState.Follower.name(), RaftState.Leader.name()));
1132
1133         // default = Leader, astronauts is unknown so sync status remains false
1134         assertEquals(false, shardManager.getMBean().getSyncStatus());
1135
1136         // Make astronauts shard leader as well
1137         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1138         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1139                 RaftState.Follower.name(), RaftState.Leader.name()));
1140
1141         // Now sync status should be true
1142         assertEquals(true, shardManager.getMBean().getSyncStatus());
1143
1144         // Make astronauts a Follower
1145         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1146                 RaftState.Leader.name(), RaftState.Follower.name()));
1147
1148         // Sync status is not true
1149         assertEquals(false, shardManager.getMBean().getSyncStatus());
1150
1151         // Make the astronauts follower sync status true
1152         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1153
1154         // Sync status is now true
1155         assertEquals(true, shardManager.getMBean().getSyncStatus());
1156
1157         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1158     }
1159
1160     @Test
1161     public void testOnReceiveSwitchShardBehavior() throws Exception {
1162         new JavaTestKit(getSystem()) {{
1163             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1164
1165             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1166             shardManager.tell(new ActorInitialized(), mockShardActor);
1167
1168             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
1169
1170             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1171
1172             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1173             assertEquals(1000, switchBehavior.getNewTerm());
1174         }};
1175     }
1176
1177     @Test
1178     public void testOnCreateShard() {
1179         LOG.info("testOnCreateShard starting");
1180         new JavaTestKit(getSystem()) {{
1181             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1182
1183             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1184                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1185
1186             SchemaContext schemaContext = TestModel.createTestContext();
1187             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1188
1189             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1190                     persistent(false).build();
1191             Shard.Builder shardBuilder = Shard.builder();
1192
1193             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1194                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1195             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1196
1197             expectMsgClass(duration("5 seconds"), Success.class);
1198
1199             shardManager.tell(new FindLocalShard("foo", true), getRef());
1200
1201             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1202
1203             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1204             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1205                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1206             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1207                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1208                     shardBuilder.getPeerAddresses().keySet());
1209             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1210                     shardBuilder.getId());
1211             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1212
1213             // Send CreateShard with same name - should return Success with a message.
1214
1215             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1216
1217             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1218             assertNotNull("Success status is null", success.status());
1219         }};
1220
1221         LOG.info("testOnCreateShard ending");
1222     }
1223
1224     @Test
1225     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1226         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1227         new JavaTestKit(getSystem()) {{
1228             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1229
1230             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1231                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1232
1233             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1234
1235             Shard.Builder shardBuilder = Shard.builder();
1236             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1237                     "foo", null, Arrays.asList("member-5", "member-6"));
1238
1239             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1240             expectMsgClass(duration("5 seconds"), Success.class);
1241
1242             shardManager.tell(new FindLocalShard("foo", true), getRef());
1243             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1244
1245             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1246             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1247                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1248         }};
1249
1250         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1251     }
1252
1253     @Test
1254     public void testOnCreateShardWithNoInitialSchemaContext() {
1255         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1256         new JavaTestKit(getSystem()) {{
1257             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1258                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1259
1260             Shard.Builder shardBuilder = Shard.builder();
1261
1262             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1263                     "foo", null, Arrays.asList("member-1"));
1264             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1265
1266             expectMsgClass(duration("5 seconds"), Success.class);
1267
1268             SchemaContext schemaContext = TestModel.createTestContext();
1269             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1270
1271             shardManager.tell(new FindLocalShard("foo", true), getRef());
1272
1273             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1274
1275             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1276             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1277         }};
1278
1279         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1280     }
1281
1282     @Test
1283     public void testGetSnapshot() throws Throwable {
1284         LOG.info("testGetSnapshot starting");
1285         JavaTestKit kit = new JavaTestKit(getSystem());
1286
1287         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1288                    put("shard1", Arrays.asList("member-1")).
1289                    put("shard2", Arrays.asList("member-1")).
1290                    put("astronauts", Collections.<String>emptyList()).build());
1291
1292         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1293                 withDispatcher(Dispatchers.DefaultDispatcherId()));
1294
1295         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1296         Failure failure = kit.expectMsgClass(Failure.class);
1297         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1298
1299         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1300
1301         waitForShardInitialized(shardManager, "shard1", kit);
1302         waitForShardInitialized(shardManager, "shard2", kit);
1303
1304         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1305
1306         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1307
1308         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1309         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1310
1311         Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
1312             @Override
1313             public String apply(ShardSnapshot s) {
1314                 return s.getName();
1315             }
1316         };
1317
1318         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1319                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1320
1321         // Add a new replica
1322
1323         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1324
1325         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1326         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1327
1328         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1329         mockShardLeaderKit.expectMsgClass(AddServer.class);
1330         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1331         kit.expectMsgClass(Status.Success.class);
1332         waitForShardInitialized(shardManager, "astronauts", kit);
1333
1334         // Send another GetSnapshot and verify
1335
1336         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1337         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1338
1339         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1340                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1341
1342         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1343         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1344         ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1345         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1346                 Sets.newHashSet(snapshot.getShardList()));
1347
1348         LOG.info("testGetSnapshot ending");
1349     }
1350
1351     @Test
1352     public void testRestoreFromSnapshot() throws Throwable {
1353         LOG.info("testRestoreFromSnapshot starting");
1354
1355         JavaTestKit kit = new JavaTestKit(getSystem());
1356
1357         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1358                    put("shard1", Collections.<String>emptyList()).
1359                    put("shard2", Collections.<String>emptyList()).
1360                    put("astronauts", Collections.<String>emptyList()).build());
1361
1362
1363         ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1364         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1365                 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1366         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1367                 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1368
1369         shardManager.underlyingActor().waitForRecoveryComplete();
1370
1371         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1372
1373         waitForShardInitialized(shardManager, "shard1", kit);
1374         waitForShardInitialized(shardManager, "shard2", kit);
1375         waitForShardInitialized(shardManager, "astronauts", kit);
1376
1377         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1378
1379         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1380
1381         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1382
1383         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1384         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1385         snapshot = SerializationUtils.deserialize(snapshotBytes);
1386         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1387                 Sets.newHashSet(snapshot.getShardList()));
1388
1389         LOG.info("testRestoreFromSnapshot ending");
1390     }
1391
1392     @Test
1393     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1394         new JavaTestKit(getSystem()) {{
1395             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1396                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1397
1398             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1399             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1400
1401             assertEquals("Failure obtained", true,
1402                           (resp.cause() instanceof IllegalArgumentException));
1403         }};
1404     }
1405
1406     @Test
1407     public void testAddShardReplica() throws Exception {
1408         MockConfiguration mockConfig =
1409                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1410                    put("default", Arrays.asList("member-1", "member-2")).
1411                    put("astronauts", Arrays.asList("member-2")).build());
1412
1413         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1414         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1415
1416         // Create an ActorSystem ShardManager actor for member-1.
1417         final ActorSystem system1 = newActorSystem("Member1");
1418         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1419         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1420         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1421                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1422                         new ClusterWrapperImpl(system1)).props(), shardManagerID);
1423
1424         // Create an ActorSystem ShardManager actor for member-2.
1425         final ActorSystem system2 = newActorSystem("Member2");
1426         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1427
1428         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1429         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1430                 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1431         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1432                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1433                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
1434
1435         new JavaTestKit(system1) {{
1436
1437             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1438             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1439
1440             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1441
1442             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1443             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1444             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1445                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1446             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1447                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1448
1449             newReplicaShardManager.underlyingActor().waitForMemberUp();
1450             leaderShardManager.underlyingActor().waitForMemberUp();
1451
1452             //Have a dummy snapshot to be overwritten by the new data persisted.
1453             String[] restoredShards = {"default", "people"};
1454             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1455             InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1456             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1457
1458             InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1459             InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1460
1461             //construct a mock response message
1462             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1463             mockShardLeaderActor.underlyingActor().updateResponse(response);
1464             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1465             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1466                 AddServer.class);
1467             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1468             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1469             expectMsgClass(duration("5 seconds"), Status.Success.class);
1470
1471             InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1472             InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1473             List<ShardManagerSnapshot> persistedSnapshots =
1474                 InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
1475             assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1476             ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1477             assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1478                     Sets.newHashSet(shardManagerSnapshot.getShardList()));
1479         }};
1480
1481         LOG.info("testAddShardReplica ending");
1482     }
1483
1484     @Test
1485     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1486         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1487         new JavaTestKit(getSystem()) {{
1488             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1489                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1490
1491             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1492             shardManager.tell(new ActorInitialized(), mockShardActor);
1493
1494             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1495             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1496             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1497                     Props.create(MockRespondActor.class, addServerReply), leaderId);
1498
1499             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1500
1501             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1502             shardManager.tell(new RoleChangeNotification(newReplicaId,
1503                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1504             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
1505                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1506
1507             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1508
1509             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1510
1511             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1512             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1513
1514             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1515             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1516
1517             // Send message again to verify previous in progress state is cleared
1518
1519             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1520             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1521             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1522
1523             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1524
1525             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1526                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1527             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1528             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1529             expectMsgClass(duration("5 seconds"), Failure.class);
1530
1531             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1532             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1533         }};
1534
1535         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1536     }
1537
1538     @Test
1539     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1540         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1541         new JavaTestKit(getSystem()) {{
1542             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1543             ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1544
1545             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1546             shardManager.tell(new ActorInitialized(), mockShardActor);
1547             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1548                     DataStoreVersions.CURRENT_VERSION), getRef());
1549             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1550                     RaftState.Leader.name())), mockShardActor);
1551
1552             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1553             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1554             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1555
1556             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1557             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1558         }};
1559
1560         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1561     }
1562
1563     @Test
1564     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1565         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1566         new JavaTestKit(getSystem()) {{
1567             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1568
1569             MockConfiguration mockConfig =
1570                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1571                        put("astronauts", Arrays.asList("member-2")).build());
1572
1573             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1574             final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1575                     newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1576             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1577
1578             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1579
1580             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1581             terminateWatcher.watch(mockNewReplicaShardActor);
1582
1583             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1584
1585             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1586             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1587                     addServerMsg.getNewServerId());
1588             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1589
1590             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1591             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1592
1593             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1594             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1595
1596             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1597
1598             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1599             mockShardLeaderKit.expectMsgClass(AddServer.class);
1600             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1601             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1602             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1603         }};
1604
1605         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1606     }
1607
1608     @Test
1609     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1610         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1611                 AddServer.class, new AddShardReplica("astronauts"));
1612     }
1613
1614     @Test
1615     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1616         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1617         new JavaTestKit(getSystem()) {{
1618             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1619                        put("astronauts", Arrays.asList("member-2")).build());
1620
1621             final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1622                     shardActor(mockShardActor).props(), shardMgrID);
1623
1624             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1625             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1626
1627             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1628             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1629             assertEquals("Failure obtained", true,
1630                           (resp.cause() instanceof RuntimeException));
1631         }};
1632
1633         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1634     }
1635
1636     @Test
1637     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1638         new JavaTestKit(getSystem()) {{
1639             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1640                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1641
1642             shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef());
1643             Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1644             assertEquals("Failure obtained", true,
1645                          (resp.cause() instanceof PrimaryNotFoundException));
1646         }};
1647     }
1648
1649     @Test
1650     /**
1651      * Primary is Local
1652      */
1653     public void testRemoveShardReplicaLocal() throws Exception {
1654         new JavaTestKit(getSystem()) {{
1655             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1656
1657             final TestActorRef<MockRespondActor> respondActor =
1658                     TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
1659
1660             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1661
1662             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1663             shardManager.tell(new ActorInitialized(), respondActor);
1664             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1665                     DataStoreVersions.CURRENT_VERSION), getRef());
1666             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1667                     RaftState.Leader.name())), respondActor);
1668
1669             respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
1670             shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
1671             final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
1672             assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
1673                     removeServer.getServerId());
1674             expectMsgClass(duration("5 seconds"), Success.class);
1675         }};
1676     }
1677
1678     @Test
1679     public void testRemoveShardReplicaRemote() throws Exception {
1680         MockConfiguration mockConfig =
1681                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1682                         put("default", Arrays.asList("member-1", "member-2")).
1683                         put("astronauts", Arrays.asList("member-1")).build());
1684
1685         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1686
1687         // Create an ActorSystem ShardManager actor for member-1.
1688         final ActorSystem system1 = newActorSystem("Member1");
1689         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1690         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1691
1692         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1693                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1694                         new ClusterWrapperImpl(system1)).props(),
1695                 shardManagerID);
1696
1697         // Create an ActorSystem ShardManager actor for member-2.
1698         final ActorSystem system2 = newActorSystem("Member2");
1699         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1700
1701         String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
1702         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1703                 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1704
1705         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1706
1707         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1708                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1709                         new ClusterWrapperImpl(system2)).props(),
1710                 shardManagerID);
1711
1712         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1713         //    akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1714         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1715         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1716         // look like so,
1717         //    akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1718         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1719         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1720         // dead letters.
1721         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1722         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1723         // thing works as expected
1724         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1725                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix);
1726
1727         LOG.error("Forwarding actor : {}", actorRef);
1728
1729         new JavaTestKit(system1) {{
1730
1731             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1732             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1733
1734             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1735             newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1736
1737             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1738             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1739             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1740                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1741             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1742                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1743
1744             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1745             newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
1746                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor);
1747             newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
1748                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1749
1750             newReplicaShardManager.underlyingActor().waitForMemberUp();
1751             leaderShardManager.underlyingActor().waitForMemberUp();
1752
1753             //construct a mock response message
1754             RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
1755             mockShardLeaderActor.underlyingActor().updateResponse(response);
1756             newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
1757             RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1758                     RemoveServer.class);
1759             String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString();
1760             assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1761             expectMsgClass(duration("5 seconds"), Status.Success.class);
1762         }};
1763
1764     }
1765
1766     @Test
1767     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1768         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"),
1769                 RemoveServer.class, new RemoveShardReplica("astronauts", "member-3"));
1770     }
1771
1772     @Test
1773     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1774         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1775                 AddServer.class, new RemoveShardReplica("astronauts", "member-2"));
1776     }
1777
1778
1779     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1780                                                       final Class<?> firstForwardedServerChangeClass,
1781                                                       final Object secondServerChange) throws Exception {
1782         new JavaTestKit(getSystem()) {{
1783             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1784             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1785
1786             MockConfiguration mockConfig =
1787                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1788                             put(shardName, Arrays.asList("member-2")).build());
1789
1790             final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1791                     newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
1792                             new MockClusterWrapper()).props(),
1793                     shardMgrID);
1794
1795             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1796
1797             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1798
1799             shardManager.tell(firstServerChange, getRef());
1800
1801             mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1802
1803             shardManager.tell(secondServerChange, secondRequestKit.getRef());
1804
1805             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1806         }};
1807     }
1808
1809     @Test
1810     public void testServerRemovedShardActorNotRunning() throws Exception {
1811         LOG.info("testServerRemovedShardActorNotRunning starting");
1812         new JavaTestKit(getSystem()) {{
1813             MockConfiguration mockConfig =
1814                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1815                             put("default", Arrays.asList("member-1", "member-2")).
1816                             put("astronauts", Arrays.asList("member-2")).
1817                             put("people", Arrays.asList("member-1", "member-2")).build());
1818
1819             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1820
1821             shardManager.underlyingActor().waitForRecoveryComplete();
1822             shardManager.tell(new FindLocalShard("people", false), getRef());
1823             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1824
1825             shardManager.tell(new FindLocalShard("default", false), getRef());
1826             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1827
1828             // Removed the default shard replica from member-1
1829             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1830             ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
1831             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1832
1833             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1834         }};
1835
1836         LOG.info("testServerRemovedShardActorNotRunning ending");
1837     }
1838
1839     @Test
1840     public void testServerRemovedShardActorRunning() throws Exception {
1841         LOG.info("testServerRemovedShardActorRunning starting");
1842         new JavaTestKit(getSystem()) {{
1843             MockConfiguration mockConfig =
1844                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1845                             put("default", Arrays.asList("member-1", "member-2")).
1846                             put("astronauts", Arrays.asList("member-2")).
1847                             put("people", Arrays.asList("member-1", "member-2")).build());
1848
1849             String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
1850                     type(shardMrgIDSuffix).build().toString();
1851             TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1852                     MessageCollectorActor.props(), shardId);
1853
1854             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1855                     newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1856
1857             shardManager.underlyingActor().waitForRecoveryComplete();
1858
1859             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1860             shardManager.tell(new ActorInitialized(), shard);
1861
1862             waitForShardInitialized(shardManager, "people", this);
1863             waitForShardInitialized(shardManager, "default", this);
1864
1865             // Removed the default shard replica from member-1
1866             shardManager.tell(new ServerRemoved(shardId), getRef());
1867
1868             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1869
1870             MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1871         }};
1872
1873         LOG.info("testServerRemovedShardActorRunning ending");
1874     }
1875
1876
1877     @Test
1878     public void testShardPersistenceWithRestoredData() throws Exception {
1879         LOG.info("testShardPersistenceWithRestoredData starting");
1880         new JavaTestKit(getSystem()) {{
1881             MockConfiguration mockConfig =
1882                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1883                    put("default", Arrays.asList("member-1", "member-2")).
1884                    put("astronauts", Arrays.asList("member-2")).
1885                    put("people", Arrays.asList("member-1", "member-2")).build());
1886             String[] restoredShards = {"default", "astronauts"};
1887             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1888             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1889
1890             //create shardManager to come up with restored data
1891             TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1892                     newShardMgrProps(mockConfig));
1893
1894             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1895
1896             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1897             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1898             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1899
1900             //Verify a local shard is created for the restored shards,
1901             //although we expect a NotInitializedException for the shards as the actor initialization
1902             //message is not sent for them
1903             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1904             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1905
1906             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1907             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1908         }};
1909
1910         LOG.info("testShardPersistenceWithRestoredData ending");
1911     }
1912
1913     @Test
1914     public void testShutDown() throws Exception {
1915         LOG.info("testShutDown starting");
1916         new JavaTestKit(getSystem()) {{
1917             MockConfiguration mockConfig =
1918                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1919                             put("shard1", Arrays.asList("member-1")).
1920                             put("shard2", Arrays.asList("member-1")).build());
1921
1922             String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
1923                     type(shardMrgIDSuffix).build().toString();
1924             TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
1925                     MessageCollectorActor.props(), shardId1);
1926
1927             String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
1928                     type(shardMrgIDSuffix).build().toString();
1929             TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
1930                     MessageCollectorActor.props(), shardId2);
1931
1932             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
1933                     mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1934
1935             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1936             shardManager.tell(new ActorInitialized(), shard1);
1937             shardManager.tell(new ActorInitialized(), shard2);
1938
1939             FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1940             Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, new Shutdown());
1941
1942             MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1943             MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1944
1945             try {
1946                 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1947                 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1948             } catch(TimeoutException e) {
1949                 // expected
1950             }
1951
1952             actorFactory.killActor(shard1, this);
1953             actorFactory.killActor(shard2, this);
1954
1955             Boolean stopped = Await.result(stopFuture, duration);
1956             assertEquals("Stopped", Boolean.TRUE, stopped);
1957         }};
1958
1959         LOG.info("testShutDown ending");
1960     }
1961
1962     private static class TestShardManager extends ShardManager {
1963         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1964         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1965         private ShardManagerSnapshot snapshot;
1966         private final Map<String, ActorRef> shardActors;
1967         private final ActorRef shardActor;
1968         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1969         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1970         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1971         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1972         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1973         private volatile MessageInterceptor messageInterceptor;
1974
1975         private TestShardManager(Builder builder) {
1976             super(builder);
1977             shardActor = builder.shardActor;
1978             shardActors = builder.shardActors;
1979         }
1980
1981         @Override
1982         public void handleRecover(Object message) throws Exception {
1983             try {
1984                 super.handleRecover(message);
1985             } finally {
1986                 if(message instanceof RecoveryCompleted) {
1987                     recoveryComplete.countDown();
1988                 }
1989             }
1990         }
1991
1992         @Override
1993         public void handleCommand(Object message) throws Exception {
1994             try{
1995                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
1996                     getSender().tell(messageInterceptor.apply(message), getSelf());
1997                 } else {
1998                     super.handleCommand(message);
1999                 }
2000             } finally {
2001                 if(message instanceof FindPrimary) {
2002                     findPrimaryMessageReceived.countDown();
2003                 } else if(message instanceof ClusterEvent.MemberUp) {
2004                     String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
2005                     if(!getCluster().getCurrentMemberName().equals(role)) {
2006                         memberUpReceived.countDown();
2007                     }
2008                 } else if(message instanceof ClusterEvent.MemberRemoved) {
2009                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
2010                     if(!getCluster().getCurrentMemberName().equals(role)) {
2011                         memberRemovedReceived.countDown();
2012                     }
2013                 } else if(message instanceof ClusterEvent.UnreachableMember) {
2014                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
2015                     if(!getCluster().getCurrentMemberName().equals(role)) {
2016                         memberUnreachableReceived.countDown();
2017                     }
2018                 } else if(message instanceof ClusterEvent.ReachableMember) {
2019                     String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
2020                     if(!getCluster().getCurrentMemberName().equals(role)) {
2021                         memberReachableReceived.countDown();
2022                     }
2023                 }
2024             }
2025         }
2026
2027         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2028             this.messageInterceptor = messageInterceptor;
2029         }
2030
2031         void waitForRecoveryComplete() {
2032             assertEquals("Recovery complete", true,
2033                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2034         }
2035
2036         void waitForMemberUp() {
2037             assertEquals("MemberUp received", true,
2038                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2039             memberUpReceived = new CountDownLatch(1);
2040         }
2041
2042         void waitForMemberRemoved() {
2043             assertEquals("MemberRemoved received", true,
2044                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2045             memberRemovedReceived = new CountDownLatch(1);
2046         }
2047
2048         void waitForUnreachableMember() {
2049             assertEquals("UnreachableMember received", true,
2050                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2051                 ));
2052             memberUnreachableReceived = new CountDownLatch(1);
2053         }
2054
2055         void waitForReachableMember() {
2056             assertEquals("ReachableMember received", true,
2057                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2058             memberReachableReceived = new CountDownLatch(1);
2059         }
2060
2061         void verifyFindPrimary() {
2062             assertEquals("FindPrimary received", true,
2063                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2064             findPrimaryMessageReceived = new CountDownLatch(1);
2065         }
2066
2067         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2068             return new Builder(datastoreContextBuilder);
2069         }
2070
2071         private static class Builder extends AbstractGenericBuilder<Builder, TestShardManager> {
2072             private ActorRef shardActor;
2073             private final Map<String, ActorRef> shardActors = new HashMap<>();
2074
2075             Builder(DatastoreContext.Builder datastoreContextBuilder) {
2076                 super(TestShardManager.class);
2077                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2078             }
2079
2080             Builder shardActor(ActorRef shardActor) {
2081                 this.shardActor = shardActor;
2082                 return this;
2083             }
2084
2085             Builder addShardActor(String shardName, ActorRef actorRef){
2086                 shardActors.put(shardName, actorRef);
2087                 return this;
2088             }
2089         }
2090
2091         @Override
2092         public void saveSnapshot(Object obj) {
2093             snapshot = (ShardManagerSnapshot) obj;
2094             snapshotPersist.countDown();
2095             super.saveSnapshot(obj);
2096         }
2097
2098         void verifySnapshotPersisted(Set<String> shardList) {
2099             assertEquals("saveSnapshot invoked", true,
2100                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2101             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2102         }
2103
2104         @Override
2105         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2106             if(shardActors.get(info.getShardName()) != null){
2107                 return shardActors.get(info.getShardName());
2108             }
2109
2110             if(shardActor != null) {
2111                 return shardActor;
2112             }
2113
2114             return super.newShardActor(schemaContext, info);
2115         }
2116     }
2117
2118     private static abstract class AbstractGenericBuilder<T extends AbstractGenericBuilder<T, ?>, C extends ShardManager>
2119                                                      extends ShardManager.AbstractBuilder<T> {
2120         private final Class<C> shardManagerClass;
2121
2122         AbstractGenericBuilder(Class<C> shardManagerClass) {
2123             this.shardManagerClass = shardManagerClass;
2124             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
2125                     waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2126         }
2127
2128         @Override
2129         public Props props() {
2130             verify();
2131             return Props.create(shardManagerClass, this);
2132         }
2133     }
2134
2135     private static class GenericBuilder<C extends ShardManager> extends AbstractGenericBuilder<GenericBuilder<C>, C> {
2136         GenericBuilder(Class<C> shardManagerClass) {
2137             super(shardManagerClass);
2138         }
2139     }
2140
2141     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2142         private static final long serialVersionUID = 1L;
2143         private final Creator<ShardManager> delegate;
2144
2145         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2146             this.delegate = delegate;
2147         }
2148
2149         @Override
2150         public ShardManager create() throws Exception {
2151             return delegate.create();
2152         }
2153     }
2154
2155     interface MessageInterceptor extends Function<Object, Object> {
2156         boolean canIntercept(Object message);
2157     }
2158
2159     private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2160         return new MessageInterceptor(){
2161             @Override
2162             public Object apply(Object message) {
2163                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2164             }
2165
2166             @Override
2167             public boolean canIntercept(Object message) {
2168                 return message instanceof FindPrimary;
2169             }
2170         };
2171     }
2172
2173     private static class MockRespondActor extends MessageCollectorActor {
2174         static final String CLEAR_RESPONSE = "clear-response";
2175         static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
2176
2177         private volatile Object responseMsg;
2178
2179         @SuppressWarnings("unused")
2180         public MockRespondActor() {
2181         }
2182
2183         @SuppressWarnings("unused")
2184         public MockRespondActor(Object responseMsg) {
2185             this.responseMsg = responseMsg;
2186         }
2187
2188         public void updateResponse(Object response) {
2189             responseMsg = response;
2190         }
2191
2192         @Override
2193         public void onReceive(Object message) throws Exception {
2194             if(!"get-all-messages".equals(message)) {
2195                 LOG.debug("Received message : {}", message);
2196             }
2197             super.onReceive(message);
2198             if (message instanceof AddServer && responseMsg != null) {
2199                 getSender().tell(responseMsg, getSelf());
2200             } else if(message instanceof RemoveServer && responseMsg != null){
2201                 getSender().tell(responseMsg, getSelf());
2202             } else if(message.equals(CLEAR_RESPONSE)) {
2203                 responseMsg = null;
2204             }
2205         }
2206     }
2207 }