Use java.util.Optional in ShardLeaderStateChanged and PrimaryShardInfo
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import akka.actor.AddressFromURIString;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.dispatch.Dispatchers;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.RecoveryCompleted;
34 import akka.serialization.Serialization;
35 import akka.testkit.JavaTestKit;
36 import akka.testkit.TestActorRef;
37 import akka.util.Timeout;
38 import com.google.common.base.Function;
39 import com.google.common.base.Stopwatch;
40 import com.google.common.collect.ImmutableMap;
41 import com.google.common.collect.ImmutableSet;
42 import com.google.common.collect.Lists;
43 import com.google.common.collect.Sets;
44 import com.google.common.util.concurrent.Uninterruptibles;
45 import com.typesafe.config.ConfigFactory;
46 import java.net.URI;
47 import java.util.AbstractMap;
48 import java.util.ArrayList;
49 import java.util.Arrays;
50 import java.util.Collection;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Map.Entry;
56 import java.util.Set;
57 import java.util.concurrent.CountDownLatch;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.TimeoutException;
60 import org.apache.commons.lang3.SerializationUtils;
61 import org.junit.After;
62 import org.junit.Before;
63 import org.junit.Test;
64 import org.mockito.Mock;
65 import org.mockito.Mockito;
66 import org.mockito.MockitoAnnotations;
67 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
68 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
69 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
70 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
71 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
72 import org.opendaylight.controller.cluster.datastore.Shard;
73 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
74 import org.opendaylight.controller.cluster.datastore.config.Configuration;
75 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
76 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
77 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
78 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
79 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
80 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
81 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
82 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
83 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
84 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
85 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
86 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
87 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
88 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
89 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
90 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
91 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
93 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
94 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
95 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
96 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
97 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
98 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
99 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
100 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
101 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
102 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
103 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
104 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
105 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
106 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
107 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
108 import org.opendaylight.controller.cluster.raft.RaftState;
109 import org.opendaylight.controller.cluster.raft.TestActorFactory;
110 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
111 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
112 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
113 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
114 import org.opendaylight.controller.cluster.raft.messages.AddServer;
115 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
116 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
117 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
118 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
119 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
120 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
121 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
122 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
123 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
124 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
125 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
126 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
127 import org.slf4j.Logger;
128 import org.slf4j.LoggerFactory;
129 import scala.concurrent.Await;
130 import scala.concurrent.Future;
131 import scala.concurrent.duration.FiniteDuration;
132
133 public class ShardManagerTest extends AbstractActorTest {
134     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
135
136     private static int ID_COUNTER = 1;
137
138     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
139     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
140
141     @Mock
142     private static CountDownLatch ready;
143
144     private static TestActorRef<MessageCollectorActor> mockShardActor;
145
146     private static ShardIdentifier mockShardName;
147
148     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
149             dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
150                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
151
152     private final Collection<ActorSystem> actorSystems = new ArrayList<>();
153
154     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
155
156     @Before
157     public void setUp() {
158         MockitoAnnotations.initMocks(this);
159
160         InMemoryJournal.clear();
161         InMemorySnapshotStore.clear();
162
163         if(mockShardActor == null) {
164             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config");
165             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
166                     mockShardName.toString());
167         }
168
169         mockShardActor.underlyingActor().clear();
170     }
171
172     @After
173     public void tearDown() {
174         InMemoryJournal.clear();
175         InMemorySnapshotStore.clear();
176
177         for(ActorSystem system: actorSystems) {
178             JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
179         }
180
181         actorFactory.close();
182     }
183
184     private ActorSystem newActorSystem(String config) {
185         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
186         actorSystems.add(system);
187         return system;
188     }
189
190     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
191         String name = new ShardIdentifier(shardName, memberName,"config").toString();
192         if(system == getSystem()) {
193             return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
194         }
195
196         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
197     }
198
199     private Props newShardMgrProps() {
200         return newShardMgrProps(new MockConfiguration());
201     }
202
203     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
204         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
205         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
206         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
207         return mockFactory;
208     }
209
210     private TestShardManager.Builder newTestShardMgrBuilder() {
211         return TestShardManager.builder(datastoreContextBuilder);
212     }
213
214     private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
215         return TestShardManager.builder(datastoreContextBuilder).configuration(config);
216     }
217
218     private Props newShardMgrProps(Configuration config) {
219         return newTestShardMgrBuilder(config).props();
220     }
221
222     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
223         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
224     }
225
226     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
227         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
228     }
229
230
231     private Props newPropsShardMgrWithMockShardActor() {
232         return newTestShardMgrBuilderWithMockShardActor().props();
233     }
234
235     private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
236         return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
237     }
238
239
240     private TestShardManager newTestShardManager() {
241         return newTestShardManager(newShardMgrProps());
242     }
243
244     private TestShardManager newTestShardManager(Props props) {
245         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
246         TestShardManager shardManager = shardManagerActor.underlyingActor();
247         shardManager.waitForRecoveryComplete();
248         return shardManager;
249     }
250
251     private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
252         AssertionError last = null;
253         Stopwatch sw = Stopwatch.createStarted();
254         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
255             try {
256                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
257                 kit.expectMsgClass(LocalShardFound.class);
258                 return;
259             } catch(AssertionError e) {
260                 last = e;
261             }
262
263             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
264         }
265
266         throw last;
267     }
268
269     private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
270         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
271         if(reply instanceof Failure) {
272             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
273         }
274
275         return (T)reply;
276     }
277
278     @Test
279     public void testPerShardDatastoreContext() throws Exception {
280         LOG.info("testPerShardDatastoreContext starting");
281         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
282                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
283
284         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
285                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
286
287         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
288                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
289
290         final MockConfiguration mockConfig = new MockConfiguration() {
291             @Override
292             public Collection<String> getMemberShardNames(String memberName) {
293                 return Arrays.asList("default", "topology");
294             }
295
296             @Override
297             public Collection<String> getMembersFromShardName(String shardName) {
298                 return Arrays.asList("member-1");
299             }
300         };
301
302         final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
303                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
304         final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
305                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
306
307         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
308                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
309         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
310         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
311
312         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
313         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
314         class LocalShardManager extends ShardManager {
315             public LocalShardManager(AbstractShardManagerCreator<?> creator) {
316                 super(creator);
317             }
318
319             @Override
320             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
321                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
322                 ActorRef ref = null;
323                 if(entry != null) {
324                     ref = entry.getKey();
325                     entry.setValue(info.getDatastoreContext());
326                 }
327
328                 newShardActorLatch.countDown();
329                 return ref;
330             }
331         }
332
333         final Creator<ShardManager> creator = new Creator<ShardManager>() {
334             private static final long serialVersionUID = 1L;
335             @Override
336             public ShardManager create() throws Exception {
337                 return new LocalShardManager(new GenericCreator<LocalShardManager>(LocalShardManager.class).
338                         datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
339                         configuration(mockConfig));
340             }
341         };
342
343         JavaTestKit kit = new JavaTestKit(getSystem());
344
345         final ActorRef shardManager = actorFactory.createActor(Props.create(
346                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
347
348         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
349
350         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
351         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
352                 getShardElectionTimeoutFactor());
353         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
354                 getShardElectionTimeoutFactor());
355
356         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
357                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
358         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
359                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
360
361         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
362                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
363
364         shardManager.tell(newMockFactory, kit.getRef());
365
366         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
367         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
368
369         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
370         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
371
372         LOG.info("testPerShardDatastoreContext ending");
373     }
374
375     @Test
376     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
377         new JavaTestKit(getSystem()) {{
378             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
379
380             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
381
382             shardManager.tell(new FindPrimary("non-existent", false), getRef());
383
384             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
385         }};
386     }
387
388     @Test
389     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
390         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
391         new JavaTestKit(getSystem()) {{
392             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
393
394             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
395
396             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
397             shardManager.tell(new ActorInitialized(), mockShardActor);
398
399             DataTree mockDataTree = mock(DataTree.class);
400             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
401                     DataStoreVersions.CURRENT_VERSION), getRef());
402
403             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
404             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
405                     RaftState.Leader.name())), mockShardActor);
406
407             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
408
409             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
410             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
411                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
412             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
413         }};
414
415         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
416     }
417
418     @Test
419     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
420         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
421         new JavaTestKit(getSystem()) {{
422             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
423
424             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
425             shardManager.tell(new ActorInitialized(), mockShardActor);
426
427             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
428             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
429             shardManager.tell(new RoleChangeNotification(memberId1,
430                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
431             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
432
433             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
434
435             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
436         }};
437
438         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
439     }
440
441     @Test
442     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
443         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
444         new JavaTestKit(getSystem()) {{
445             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
446
447             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
448             shardManager.tell(new ActorInitialized(), mockShardActor);
449
450             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
451             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
452
453             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
454             shardManager.tell(new RoleChangeNotification(memberId1,
455                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
456             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
457             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
458
459             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
460
461             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
462             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
463                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
464             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
465         }};
466
467         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
468     }
469
470     @Test
471     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
472         new JavaTestKit(getSystem()) {{
473             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
474
475             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
476
477             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
478         }};
479     }
480
481     @Test
482     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
483         new JavaTestKit(getSystem()) {{
484             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
485
486             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
487             shardManager.tell(new ActorInitialized(), mockShardActor);
488
489             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
490
491             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
492         }};
493     }
494
495     @Test
496     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
497         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
498         new JavaTestKit(getSystem()) {{
499             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
500
501             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
502             shardManager.tell(new ActorInitialized(), mockShardActor);
503
504             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
505             shardManager.tell(new RoleChangeNotification(memberId,
506                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
507
508             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
509
510             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
511
512             DataTree mockDataTree = mock(DataTree.class);
513             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
514                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
515
516             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
517
518             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
519             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
520                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
521             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
522         }};
523
524         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
525     }
526
527     @Test
528     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
529         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
530         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
531         new JavaTestKit(getSystem()) {{
532             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
533
534             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
535
536             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
537             // delayed until we send ActorInitialized and RoleChangeNotification.
538             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
539
540             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
541
542             shardManager.tell(new ActorInitialized(), mockShardActor);
543
544             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
545
546             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
547             shardManager.tell(new RoleChangeNotification(memberId,
548                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
549
550             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
551
552             DataTree mockDataTree = mock(DataTree.class);
553             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
554                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
555
556             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
557             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
558                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
559             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
560
561             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
562         }};
563
564         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
565     }
566
567     @Test
568     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
569         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
570         new JavaTestKit(getSystem()) {{
571             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
572
573             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
574
575             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
576
577             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
578
579             shardManager.tell(new ActorInitialized(), mockShardActor);
580
581             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
582         }};
583
584         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
585     }
586
587     @Test
588     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
589         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
590         new JavaTestKit(getSystem()) {{
591             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
592
593             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
594             shardManager.tell(new ActorInitialized(), mockShardActor);
595             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
596                     null, RaftState.Candidate.name()), mockShardActor);
597
598             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
599
600             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
601         }};
602
603         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
604     }
605
606     @Test
607     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
608         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
609         new JavaTestKit(getSystem()) {{
610             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
611
612             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
613             shardManager.tell(new ActorInitialized(), mockShardActor);
614             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
615                     null, RaftState.IsolatedLeader.name()), mockShardActor);
616
617             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
618
619             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
620         }};
621
622         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
623     }
624
625     @Test
626     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
627         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
628         new JavaTestKit(getSystem()) {{
629             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
630
631             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
632             shardManager.tell(new ActorInitialized(), mockShardActor);
633
634             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
635
636             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
637         }};
638
639         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
640     }
641
642     @Test
643     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
644         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
645         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
646
647         // Create an ActorSystem ShardManager actor for member-1.
648
649         final ActorSystem system1 = newActorSystem("Member1");
650         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
651
652         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
653                 newTestShardMgrBuilderWithMockShardActor().cluster(
654                         new ClusterWrapperImpl(system1)).props().withDispatcher(
655                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
656
657         // Create an ActorSystem ShardManager actor for member-2.
658
659         final ActorSystem system2 = newActorSystem("Member2");
660
661         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
662
663         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
664
665         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
666                 put("default", Arrays.asList("member-1", "member-2")).
667                 put("astronauts", Arrays.asList("member-2")).build());
668
669         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
670                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
671                         new ClusterWrapperImpl(system2)).props().withDispatcher(
672                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
673
674         new JavaTestKit(system1) {{
675
676             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
677             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
678
679             shardManager2.tell(new ActorInitialized(), mockShardActor2);
680
681             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
682             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
683             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
684                     mock(DataTree.class), leaderVersion), mockShardActor2);
685             shardManager2.tell(new RoleChangeNotification(memberId2,
686                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
687
688             shardManager1.underlyingActor().waitForMemberUp();
689
690             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
691
692             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
693             String path = found.getPrimaryPath();
694             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
695             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
696
697             shardManager2.underlyingActor().verifyFindPrimary();
698
699             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
700
701             shardManager1.underlyingActor().waitForMemberRemoved();
702
703             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
704
705             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
706         }};
707
708         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
709     }
710
711     @Test
712     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
713         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
714         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
715
716         // Create an ActorSystem ShardManager actor for member-1.
717
718         final ActorSystem system1 = newActorSystem("Member1");
719         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
720
721         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
722
723         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
724                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
725                         new ClusterWrapperImpl(system1)).props().withDispatcher(
726                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
727
728         // Create an ActorSystem ShardManager actor for member-2.
729
730         final ActorSystem system2 = newActorSystem("Member2");
731
732         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
733
734         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
735
736         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
737             put("default", Arrays.asList("member-1", "member-2")).build());
738
739         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
740                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
741                         new ClusterWrapperImpl(system2)).props().withDispatcher(
742                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
743
744         new JavaTestKit(system1) {{
745
746             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
747             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
748             shardManager1.tell(new ActorInitialized(), mockShardActor1);
749             shardManager2.tell(new ActorInitialized(), mockShardActor2);
750
751             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
752             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
753             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
754                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
755             shardManager1.tell(new RoleChangeNotification(memberId1,
756                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
757             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
758                     DataStoreVersions.CURRENT_VERSION),
759                 mockShardActor2);
760             shardManager2.tell(new RoleChangeNotification(memberId2,
761                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
762             shardManager1.underlyingActor().waitForMemberUp();
763
764             shardManager1.tell(new FindPrimary("default", true), getRef());
765
766             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
767             String path = found.getPrimaryPath();
768             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
769
770             shardManager1.tell(MockClusterWrapper.
771                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
772
773             shardManager1.underlyingActor().waitForUnreachableMember();
774
775             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
776             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
777             MessageCollectorActor.clearMessages(mockShardActor1);
778
779             shardManager1.tell(MockClusterWrapper.
780                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
781
782             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
783
784             shardManager1.tell(new FindPrimary("default", true), getRef());
785
786             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
787
788             shardManager1.tell(MockClusterWrapper.
789                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
790
791             shardManager1.underlyingActor().waitForReachableMember();
792
793             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
794             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
795             MessageCollectorActor.clearMessages(mockShardActor1);
796
797             shardManager1.tell(new FindPrimary("default", true), getRef());
798
799             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
800             String path1 = found1.getPrimaryPath();
801             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
802
803             shardManager1.tell(MockClusterWrapper.
804                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
805
806             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
807
808             // Test FindPrimary wait succeeds after reachable member event.
809
810             shardManager1.tell(MockClusterWrapper.
811                     createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
812             shardManager1.underlyingActor().waitForUnreachableMember();
813
814             shardManager1.tell(new FindPrimary("default", true), getRef());
815
816             shardManager1.tell(MockClusterWrapper.
817                     createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
818
819             RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
820             String path2 = found2.getPrimaryPath();
821             assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
822         }};
823
824         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
825     }
826
827     @Test
828     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
829         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
830         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
831
832         // Create an ActorSystem ShardManager actor for member-1.
833
834         final ActorSystem system1 = newActorSystem("Member1");
835         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
836
837         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
838
839         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
840         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
841                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
842                         new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
843                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
844
845         // Create an ActorSystem ShardManager actor for member-2.
846
847         final ActorSystem system2 = newActorSystem("Member2");
848
849         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
850
851         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
852
853         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
854             put("default", Arrays.asList("member-1", "member-2")).build());
855
856         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
857                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
858                         new ClusterWrapperImpl(system2)).props().withDispatcher(
859                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
860
861         new JavaTestKit(system1) {{
862             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
863             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
864             shardManager1.tell(new ActorInitialized(), mockShardActor1);
865             shardManager2.tell(new ActorInitialized(), mockShardActor2);
866
867             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
868             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
869             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
870                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
871             shardManager1.tell(new RoleChangeNotification(memberId1,
872                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
873             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
874                     DataStoreVersions.CURRENT_VERSION),
875                 mockShardActor2);
876             shardManager2.tell(new RoleChangeNotification(memberId2,
877                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
878             shardManager1.underlyingActor().waitForMemberUp();
879
880             shardManager1.tell(new FindPrimary("default", true), getRef());
881
882             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
883             String path = found.getPrimaryPath();
884             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
885
886             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
887                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
888
889             shardManager1.tell(MockClusterWrapper.
890                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
891
892             shardManager1.underlyingActor().waitForUnreachableMember();
893
894             shardManager1.tell(new FindPrimary("default", true), getRef());
895
896             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
897
898             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
899
900             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
901                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
902             shardManager1.tell(new RoleChangeNotification(memberId1,
903                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
904
905             shardManager1.tell(new FindPrimary("default", true), getRef());
906
907             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
908             String path1 = found1.getPrimaryPath();
909             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
910
911         }};
912
913         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
914     }
915
916
917     @Test
918     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
919         new JavaTestKit(getSystem()) {{
920             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
921
922             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
923
924             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
925
926             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
927
928             assertEquals("getShardName", "non-existent", notFound.getShardName());
929         }};
930     }
931
932     @Test
933     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
934         new JavaTestKit(getSystem()) {{
935             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
936
937             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
938             shardManager.tell(new ActorInitialized(), mockShardActor);
939
940             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
941
942             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
943
944             assertTrue("Found path contains " + found.getPath().path().toString(),
945                     found.getPath().path().toString().contains("member-1-shard-default-config"));
946         }};
947     }
948
949     @Test
950     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
951         new JavaTestKit(getSystem()) {{
952             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
953
954             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
955
956             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
957         }};
958     }
959
960     @Test
961     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
962         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
963         new JavaTestKit(getSystem()) {{
964             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
965
966             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
967
968             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
969             // delayed until we send ActorInitialized.
970             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
971                     new Timeout(5, TimeUnit.SECONDS));
972
973             shardManager.tell(new ActorInitialized(), mockShardActor);
974
975             Object resp = Await.result(future, duration("5 seconds"));
976             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
977         }};
978
979         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
980     }
981
982     @Test
983     public void testOnRecoveryJournalIsCleaned() {
984         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
985         InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
986         InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
987         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
988
989         newTestShardManager();
990
991         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
992
993         // Journal entries up to the last one should've been deleted
994         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
995         synchronized (journal) {
996             assertEquals("Journal size", 0, journal.size());
997         }
998     }
999
1000     @Test
1001     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1002         TestShardManager shardManager = newTestShardManager();
1003
1004         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1005         shardManager.onReceiveCommand(new RoleChangeNotification(
1006                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1007
1008         verify(ready, never()).countDown();
1009
1010         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1011                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1012
1013         verify(ready, times(1)).countDown();
1014     }
1015
1016     @Test
1017     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1018         new JavaTestKit(getSystem()) {{
1019             TestShardManager shardManager = newTestShardManager();
1020
1021             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1022             shardManager.onReceiveCommand(new RoleChangeNotification(
1023                     memberId, null, RaftState.Follower.name()));
1024
1025             verify(ready, never()).countDown();
1026
1027             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1028
1029             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1030                     "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1031                     DataStoreVersions.CURRENT_VERSION));
1032
1033             verify(ready, times(1)).countDown();
1034         }};
1035     }
1036
1037     @Test
1038     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1039         new JavaTestKit(getSystem()) {{
1040             TestShardManager shardManager = newTestShardManager();
1041
1042             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1043             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1044
1045             verify(ready, never()).countDown();
1046
1047             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1048                     "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1049                     DataStoreVersions.CURRENT_VERSION));
1050
1051             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1052
1053             verify(ready, times(1)).countDown();
1054         }};
1055     }
1056
1057     @Test
1058     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1059         TestShardManager shardManager = newTestShardManager();
1060
1061         shardManager.onReceiveCommand(new RoleChangeNotification(
1062                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1063
1064         verify(ready, never()).countDown();
1065     }
1066
1067     @Test
1068     public void testByDefaultSyncStatusIsFalse() throws Exception{
1069         TestShardManager shardManager = newTestShardManager();
1070
1071         assertEquals(false, shardManager.getMBean().getSyncStatus());
1072     }
1073
1074     @Test
1075     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1076         TestShardManager shardManager = newTestShardManager();
1077
1078         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1079                 RaftState.Follower.name(), RaftState.Leader.name()));
1080
1081         assertEquals(true, shardManager.getMBean().getSyncStatus());
1082     }
1083
1084     @Test
1085     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1086         TestShardManager shardManager = newTestShardManager();
1087
1088         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1089         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1090                 RaftState.Follower.name(), RaftState.Candidate.name()));
1091
1092         assertEquals(false, shardManager.getMBean().getSyncStatus());
1093
1094         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1095         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1096                 true, shardId));
1097
1098         assertEquals(false, shardManager.getMBean().getSyncStatus());
1099     }
1100
1101     @Test
1102     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1103         TestShardManager shardManager = newTestShardManager();
1104
1105         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1106         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1107                 RaftState.Candidate.name(), RaftState.Follower.name()));
1108
1109         // Initially will be false
1110         assertEquals(false, shardManager.getMBean().getSyncStatus());
1111
1112         // Send status true will make sync status true
1113         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1114
1115         assertEquals(true, shardManager.getMBean().getSyncStatus());
1116
1117         // Send status false will make sync status false
1118         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1119
1120         assertEquals(false, shardManager.getMBean().getSyncStatus());
1121
1122     }
1123
1124     @Test
1125     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1126         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1127         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1128             @Override
1129             public List<String> getMemberShardNames(String memberName) {
1130                 return Arrays.asList("default", "astronauts");
1131             }
1132         }));
1133
1134         // Initially will be false
1135         assertEquals(false, shardManager.getMBean().getSyncStatus());
1136
1137         // Make default shard leader
1138         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1139         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1140                 RaftState.Follower.name(), RaftState.Leader.name()));
1141
1142         // default = Leader, astronauts is unknown so sync status remains false
1143         assertEquals(false, shardManager.getMBean().getSyncStatus());
1144
1145         // Make astronauts shard leader as well
1146         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1147         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1148                 RaftState.Follower.name(), RaftState.Leader.name()));
1149
1150         // Now sync status should be true
1151         assertEquals(true, shardManager.getMBean().getSyncStatus());
1152
1153         // Make astronauts a Follower
1154         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1155                 RaftState.Leader.name(), RaftState.Follower.name()));
1156
1157         // Sync status is not true
1158         assertEquals(false, shardManager.getMBean().getSyncStatus());
1159
1160         // Make the astronauts follower sync status true
1161         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1162
1163         // Sync status is now true
1164         assertEquals(true, shardManager.getMBean().getSyncStatus());
1165
1166         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1167     }
1168
1169     @Test
1170     public void testOnReceiveSwitchShardBehavior() throws Exception {
1171         new JavaTestKit(getSystem()) {{
1172             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1173
1174             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1175             shardManager.tell(new ActorInitialized(), mockShardActor);
1176
1177             shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1178
1179             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1180
1181             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1182             assertEquals(1000, switchBehavior.getNewTerm());
1183         }};
1184     }
1185
1186     @Test
1187     public void testOnCreateShard() {
1188         LOG.info("testOnCreateShard starting");
1189         new JavaTestKit(getSystem()) {{
1190             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1191
1192             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1193                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1194
1195             SchemaContext schemaContext = TestModel.createTestContext();
1196             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1197
1198             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1199                     persistent(false).build();
1200             Shard.Builder shardBuilder = Shard.builder();
1201
1202             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1203                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1204             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1205
1206             expectMsgClass(duration("5 seconds"), Success.class);
1207
1208             shardManager.tell(new FindLocalShard("foo", true), getRef());
1209
1210             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1211
1212             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1213             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1214                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1215             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1216                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1217                     shardBuilder.getPeerAddresses().keySet());
1218             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1219                     shardBuilder.getId());
1220             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1221
1222             // Send CreateShard with same name - should return Success with a message.
1223
1224             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1225
1226             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1227             assertNotNull("Success status is null", success.status());
1228         }};
1229
1230         LOG.info("testOnCreateShard ending");
1231     }
1232
1233     @Test
1234     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1235         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1236         new JavaTestKit(getSystem()) {{
1237             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1238
1239             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1240                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1241
1242             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1243
1244             Shard.Builder shardBuilder = Shard.builder();
1245             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1246                     "foo", null, Arrays.asList("member-5", "member-6"));
1247
1248             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1249             expectMsgClass(duration("5 seconds"), Success.class);
1250
1251             shardManager.tell(new FindLocalShard("foo", true), getRef());
1252             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1253
1254             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1255             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1256                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1257         }};
1258
1259         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1260     }
1261
1262     @Test
1263     public void testOnCreateShardWithNoInitialSchemaContext() {
1264         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1265         new JavaTestKit(getSystem()) {{
1266             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1267                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1268
1269             Shard.Builder shardBuilder = Shard.builder();
1270
1271             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1272                     "foo", null, Arrays.asList("member-1"));
1273             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1274
1275             expectMsgClass(duration("5 seconds"), Success.class);
1276
1277             SchemaContext schemaContext = TestModel.createTestContext();
1278             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1279
1280             shardManager.tell(new FindLocalShard("foo", true), getRef());
1281
1282             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1283
1284             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1285             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1286         }};
1287
1288         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1289     }
1290
1291     @Test
1292     public void testGetSnapshot() throws Throwable {
1293         LOG.info("testGetSnapshot starting");
1294         JavaTestKit kit = new JavaTestKit(getSystem());
1295
1296         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1297                    put("shard1", Arrays.asList("member-1")).
1298                    put("shard2", Arrays.asList("member-1")).
1299                    put("astronauts", Collections.<String>emptyList()).build());
1300
1301         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1302                 withDispatcher(Dispatchers.DefaultDispatcherId()));
1303
1304         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1305         Failure failure = kit.expectMsgClass(Failure.class);
1306         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1307
1308         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1309
1310         waitForShardInitialized(shardManager, "shard1", kit);
1311         waitForShardInitialized(shardManager, "shard2", kit);
1312
1313         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1314
1315         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1316
1317         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1318         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1319
1320         Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
1321             @Override
1322             public String apply(ShardSnapshot s) {
1323                 return s.getName();
1324             }
1325         };
1326
1327         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1328                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1329
1330         // Add a new replica
1331
1332         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1333
1334         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1335         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1336
1337         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1338         mockShardLeaderKit.expectMsgClass(AddServer.class);
1339         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1340         kit.expectMsgClass(Status.Success.class);
1341         waitForShardInitialized(shardManager, "astronauts", kit);
1342
1343         // Send another GetSnapshot and verify
1344
1345         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1346         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1347
1348         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1349                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1350
1351         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1352         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1353         ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1354         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1355                 Sets.newHashSet(snapshot.getShardList()));
1356
1357         LOG.info("testGetSnapshot ending");
1358     }
1359
1360     @Test
1361     public void testRestoreFromSnapshot() throws Throwable {
1362         LOG.info("testRestoreFromSnapshot starting");
1363
1364         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1365
1366         JavaTestKit kit = new JavaTestKit(getSystem());
1367
1368         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1369                    put("shard1", Collections.<String>emptyList()).
1370                    put("shard2", Collections.<String>emptyList()).
1371                    put("astronauts", Collections.<String>emptyList()).build());
1372
1373
1374         ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1375         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1376                 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1377         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1378                 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1379
1380         shardManager.underlyingActor().waitForRecoveryComplete();
1381
1382         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1383
1384         waitForShardInitialized(shardManager, "shard1", kit);
1385         waitForShardInitialized(shardManager, "shard2", kit);
1386         waitForShardInitialized(shardManager, "astronauts", kit);
1387
1388         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1389
1390         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1391
1392         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1393
1394         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1395         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1396         snapshot = SerializationUtils.deserialize(snapshotBytes);
1397         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1398                 Sets.newHashSet(snapshot.getShardList()));
1399
1400         LOG.info("testRestoreFromSnapshot ending");
1401     }
1402
1403     @Test
1404     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1405         new JavaTestKit(getSystem()) {{
1406             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1407                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1408
1409             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1410             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1411
1412             assertEquals("Failure obtained", true,
1413                           (resp.cause() instanceof IllegalArgumentException));
1414         }};
1415     }
1416
1417     @Test
1418     public void testAddShardReplica() throws Exception {
1419         LOG.info("testAddShardReplica starting");
1420         MockConfiguration mockConfig =
1421                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1422                    put("default", Arrays.asList("member-1", "member-2")).
1423                    put("astronauts", Arrays.asList("member-2")).build());
1424
1425         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1426         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1427
1428         // Create an ActorSystem ShardManager actor for member-1.
1429         final ActorSystem system1 = newActorSystem("Member1");
1430         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1431         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1432         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1433                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1434                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1435
1436         // Create an ActorSystem ShardManager actor for member-2.
1437         final ActorSystem system2 = newActorSystem("Member2");
1438         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1439
1440         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1441         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1442                 TestActorRef.create(system2, Props.create(MockRespondActor.class).
1443                         withDispatcher(Dispatchers.DefaultDispatcherId()), name);
1444         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1445                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1446                         new ClusterWrapperImpl(system2)).props().
1447                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1448
1449         new JavaTestKit(system1) {{
1450
1451             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1452             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1453
1454             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1455
1456             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1457             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1458             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1459                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1460             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1461                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1462
1463             newReplicaShardManager.underlyingActor().waitForMemberUp();
1464             leaderShardManager.underlyingActor().waitForMemberUp();
1465
1466             //Have a dummy snapshot to be overwritten by the new data persisted.
1467             String[] restoredShards = {"default", "people"};
1468             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1469             InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1470             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1471
1472             InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1473             InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1474
1475             //construct a mock response message
1476             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1477             mockShardLeaderActor.underlyingActor().updateResponse(response);
1478             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1479             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1480                 AddServer.class);
1481             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1482             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1483             expectMsgClass(duration("5 seconds"), Status.Success.class);
1484
1485             InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1486             InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1487             List<ShardManagerSnapshot> persistedSnapshots =
1488                 InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
1489             assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1490             ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1491             assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1492                     Sets.newHashSet(shardManagerSnapshot.getShardList()));
1493         }};
1494         LOG.info("testAddShardReplica ending");
1495     }
1496
1497     @Test
1498     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1499         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1500         new JavaTestKit(getSystem()) {{
1501             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1502                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1503
1504             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1505             shardManager.tell(new ActorInitialized(), mockShardActor);
1506
1507             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1508             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1509             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1510                     Props.create(MockRespondActor.class, addServerReply), leaderId);
1511
1512             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1513
1514             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1515             shardManager.tell(new RoleChangeNotification(newReplicaId,
1516                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1517             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId,
1518                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1519
1520             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1521
1522             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1523
1524             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1525             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1526
1527             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1528             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1529
1530             // Send message again to verify previous in progress state is cleared
1531
1532             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1533             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1534             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1535
1536             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1537
1538             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1539                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1540             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1541             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1542             expectMsgClass(duration("5 seconds"), Failure.class);
1543
1544             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1545             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1546         }};
1547
1548         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1549     }
1550
1551     @Test
1552     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1553         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1554         new JavaTestKit(getSystem()) {{
1555             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1556             ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1557
1558             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1559             shardManager.tell(new ActorInitialized(), mockShardActor);
1560             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1561                     DataStoreVersions.CURRENT_VERSION), getRef());
1562             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1563                     RaftState.Leader.name())), mockShardActor);
1564
1565             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1566             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1567             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1568
1569             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1570             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1571         }};
1572
1573         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1574     }
1575
1576     @Test
1577     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1578         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1579         new JavaTestKit(getSystem()) {{
1580             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1581
1582             MockConfiguration mockConfig =
1583                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1584                        put("astronauts", Arrays.asList("member-2")).build());
1585
1586             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1587             final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1588                     newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1589             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1590
1591             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1592
1593             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1594             terminateWatcher.watch(mockNewReplicaShardActor);
1595
1596             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1597
1598             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1599             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1600                     addServerMsg.getNewServerId());
1601             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1602
1603             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1604             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1605
1606             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1607             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1608
1609             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1610
1611             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1612             mockShardLeaderKit.expectMsgClass(AddServer.class);
1613             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1614             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1615             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1616         }};
1617
1618         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1619     }
1620
1621     @Test
1622     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1623         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1624                 AddServer.class, new AddShardReplica("astronauts"));
1625     }
1626
1627     @Test
1628     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1629         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1630         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1631         new JavaTestKit(getSystem()) {{
1632             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1633                        put("astronauts", Arrays.asList("member-2")).build());
1634
1635             final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1636                     shardActor(mockShardActor).props(), shardMgrID);
1637
1638             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1639             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1640                     AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
1641
1642             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1643             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1644             assertEquals("Failure obtained", true,
1645                           (resp.cause() instanceof RuntimeException));
1646         }};
1647
1648         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1649     }
1650
1651     @Test
1652     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1653         new JavaTestKit(getSystem()) {{
1654             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1655                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1656
1657             shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef());
1658             Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1659             assertEquals("Failure obtained", true,
1660                          (resp.cause() instanceof PrimaryNotFoundException));
1661         }};
1662     }
1663
1664     @Test
1665     /**
1666      * Primary is Local
1667      */
1668     public void testRemoveShardReplicaLocal() throws Exception {
1669         new JavaTestKit(getSystem()) {{
1670             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1671
1672             final TestActorRef<MockRespondActor> respondActor =
1673                     TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
1674
1675             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1676
1677             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1678             shardManager.tell(new ActorInitialized(), respondActor);
1679             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1680                     DataStoreVersions.CURRENT_VERSION), getRef());
1681             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1682                     RaftState.Leader.name())), respondActor);
1683
1684             respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
1685             shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
1686             final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
1687             assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
1688                     removeServer.getServerId());
1689             expectMsgClass(duration("5 seconds"), Success.class);
1690         }};
1691     }
1692
1693     @Test
1694     public void testRemoveShardReplicaRemote() throws Exception {
1695         MockConfiguration mockConfig =
1696                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1697                         put("default", Arrays.asList("member-1", "member-2")).
1698                         put("astronauts", Arrays.asList("member-1")).build());
1699
1700         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1701
1702         // Create an ActorSystem ShardManager actor for member-1.
1703         final ActorSystem system1 = newActorSystem("Member1");
1704         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1705         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1706
1707         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1708                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1709                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1710                 shardManagerID);
1711
1712         // Create an ActorSystem ShardManager actor for member-2.
1713         final ActorSystem system2 = newActorSystem("Member2");
1714         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1715
1716         String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
1717         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1718                 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1719
1720         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1721
1722         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1723                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1724                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1725                 shardManagerID);
1726
1727         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1728         //    akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1729         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1730         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1731         // look like so,
1732         //    akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1733         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1734         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1735         // dead letters.
1736         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1737         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1738         // thing works as expected
1739         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1740                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix);
1741
1742         LOG.error("Forwarding actor : {}", actorRef);
1743
1744         new JavaTestKit(system1) {{
1745
1746             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1747             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1748
1749             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1750             newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1751
1752             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1753             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1754             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1755                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1756             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1757                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1758
1759             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1760             newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
1761                     mock(DataTree.class), leaderVersion), mockShardActor);
1762             newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
1763                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1764
1765             newReplicaShardManager.underlyingActor().waitForMemberUp();
1766             leaderShardManager.underlyingActor().waitForMemberUp();
1767
1768             //construct a mock response message
1769             RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
1770             mockShardLeaderActor.underlyingActor().updateResponse(response);
1771             newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
1772             RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1773                     RemoveServer.class);
1774             String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString();
1775             assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1776             expectMsgClass(duration("5 seconds"), Status.Success.class);
1777         }};
1778
1779     }
1780
1781     @Test
1782     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1783         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"),
1784                 RemoveServer.class, new RemoveShardReplica("astronauts", "member-3"));
1785     }
1786
1787     @Test
1788     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1789         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1790                 AddServer.class, new RemoveShardReplica("astronauts", "member-2"));
1791     }
1792
1793
1794     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1795                                                       final Class<?> firstForwardedServerChangeClass,
1796                                                       final Object secondServerChange) throws Exception {
1797         new JavaTestKit(getSystem()) {{
1798             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1799             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1800
1801             MockConfiguration mockConfig =
1802                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1803                             put(shardName, Arrays.asList("member-2")).build());
1804
1805             final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1806                     newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
1807                             new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1808                     shardMgrID);
1809
1810             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1811
1812             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1813
1814             shardManager.tell(firstServerChange, getRef());
1815
1816             mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1817
1818             shardManager.tell(secondServerChange, secondRequestKit.getRef());
1819
1820             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1821         }};
1822     }
1823
1824     @Test
1825     public void testServerRemovedShardActorNotRunning() throws Exception {
1826         LOG.info("testServerRemovedShardActorNotRunning starting");
1827         new JavaTestKit(getSystem()) {{
1828             MockConfiguration mockConfig =
1829                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1830                             put("default", Arrays.asList("member-1", "member-2")).
1831                             put("astronauts", Arrays.asList("member-2")).
1832                             put("people", Arrays.asList("member-1", "member-2")).build());
1833
1834             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1835
1836             shardManager.underlyingActor().waitForRecoveryComplete();
1837             shardManager.tell(new FindLocalShard("people", false), getRef());
1838             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1839
1840             shardManager.tell(new FindLocalShard("default", false), getRef());
1841             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1842
1843             // Removed the default shard replica from member-1
1844             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1845             ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
1846             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1847
1848             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1849         }};
1850
1851         LOG.info("testServerRemovedShardActorNotRunning ending");
1852     }
1853
1854     @Test
1855     public void testServerRemovedShardActorRunning() throws Exception {
1856         LOG.info("testServerRemovedShardActorRunning starting");
1857         new JavaTestKit(getSystem()) {{
1858             MockConfiguration mockConfig =
1859                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1860                             put("default", Arrays.asList("member-1", "member-2")).
1861                             put("astronauts", Arrays.asList("member-2")).
1862                             put("people", Arrays.asList("member-1", "member-2")).build());
1863
1864             String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
1865                     type(shardMrgIDSuffix).build().toString();
1866             TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1867                     MessageCollectorActor.props(), shardId);
1868
1869             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1870                     newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1871
1872             shardManager.underlyingActor().waitForRecoveryComplete();
1873
1874             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1875             shardManager.tell(new ActorInitialized(), shard);
1876
1877             waitForShardInitialized(shardManager, "people", this);
1878             waitForShardInitialized(shardManager, "default", this);
1879
1880             // Removed the default shard replica from member-1
1881             shardManager.tell(new ServerRemoved(shardId), getRef());
1882
1883             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1884
1885             MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1886         }};
1887
1888         LOG.info("testServerRemovedShardActorRunning ending");
1889     }
1890
1891
1892     @Test
1893     public void testShardPersistenceWithRestoredData() throws Exception {
1894         LOG.info("testShardPersistenceWithRestoredData starting");
1895         new JavaTestKit(getSystem()) {{
1896             MockConfiguration mockConfig =
1897                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1898                    put("default", Arrays.asList("member-1", "member-2")).
1899                    put("astronauts", Arrays.asList("member-2")).
1900                    put("people", Arrays.asList("member-1", "member-2")).build());
1901             String[] restoredShards = {"default", "astronauts"};
1902             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1903             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1904
1905             //create shardManager to come up with restored data
1906             TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1907                     newShardMgrProps(mockConfig));
1908
1909             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1910
1911             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1912             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1913             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1914
1915             //Verify a local shard is created for the restored shards,
1916             //although we expect a NotInitializedException for the shards as the actor initialization
1917             //message is not sent for them
1918             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1919             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1920
1921             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1922             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1923         }};
1924
1925         LOG.info("testShardPersistenceWithRestoredData ending");
1926     }
1927
1928     @Test
1929     public void testShutDown() throws Exception {
1930         LOG.info("testShutDown starting");
1931         new JavaTestKit(getSystem()) {{
1932             MockConfiguration mockConfig =
1933                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1934                             put("shard1", Arrays.asList("member-1")).
1935                             put("shard2", Arrays.asList("member-1")).build());
1936
1937             String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
1938                     type(shardMrgIDSuffix).build().toString();
1939             TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
1940                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
1941
1942             String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
1943                     type(shardMrgIDSuffix).build().toString();
1944             TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
1945                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
1946
1947             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
1948                     mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
1949                         withDispatcher(Dispatchers.DefaultDispatcherId()));
1950
1951             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1952             shardManager.tell(new ActorInitialized(), shard1);
1953             shardManager.tell(new ActorInitialized(), shard2);
1954
1955             FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1956             Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1957
1958             MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1959             MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1960
1961             try {
1962                 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1963                 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1964             } catch(TimeoutException e) {
1965                 // expected
1966             }
1967
1968             actorFactory.killActor(shard1, this);
1969             actorFactory.killActor(shard2, this);
1970
1971             Boolean stopped = Await.result(stopFuture, duration);
1972             assertEquals("Stopped", Boolean.TRUE, stopped);
1973         }};
1974
1975         LOG.info("testShutDown ending");
1976     }
1977
1978     private static class TestShardManager extends ShardManager {
1979         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1980         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1981         private ShardManagerSnapshot snapshot;
1982         private final Map<String, ActorRef> shardActors;
1983         private final ActorRef shardActor;
1984         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1985         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1986         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1987         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1988         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1989         private volatile MessageInterceptor messageInterceptor;
1990
1991         private TestShardManager(Builder builder) {
1992             super(builder);
1993             shardActor = builder.shardActor;
1994             shardActors = builder.shardActors;
1995         }
1996
1997         @Override
1998         protected void handleRecover(Object message) throws Exception {
1999             try {
2000                 super.handleRecover(message);
2001             } finally {
2002                 if(message instanceof RecoveryCompleted) {
2003                     recoveryComplete.countDown();
2004                 }
2005             }
2006         }
2007
2008         @Override
2009         public void handleCommand(Object message) throws Exception {
2010             try{
2011                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2012                     getSender().tell(messageInterceptor.apply(message), getSelf());
2013                 } else {
2014                     super.handleCommand(message);
2015                 }
2016             } finally {
2017                 if(message instanceof FindPrimary) {
2018                     findPrimaryMessageReceived.countDown();
2019                 } else if(message instanceof ClusterEvent.MemberUp) {
2020                     String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
2021                     if(!getCluster().getCurrentMemberName().equals(role)) {
2022                         memberUpReceived.countDown();
2023                     }
2024                 } else if(message instanceof ClusterEvent.MemberRemoved) {
2025                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
2026                     if(!getCluster().getCurrentMemberName().equals(role)) {
2027                         memberRemovedReceived.countDown();
2028                     }
2029                 } else if(message instanceof ClusterEvent.UnreachableMember) {
2030                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
2031                     if(!getCluster().getCurrentMemberName().equals(role)) {
2032                         memberUnreachableReceived.countDown();
2033                     }
2034                 } else if(message instanceof ClusterEvent.ReachableMember) {
2035                     String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
2036                     if(!getCluster().getCurrentMemberName().equals(role)) {
2037                         memberReachableReceived.countDown();
2038                     }
2039                 }
2040             }
2041         }
2042
2043         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2044             this.messageInterceptor = messageInterceptor;
2045         }
2046
2047         void waitForRecoveryComplete() {
2048             assertEquals("Recovery complete", true,
2049                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2050         }
2051
2052         void waitForMemberUp() {
2053             assertEquals("MemberUp received", true,
2054                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2055             memberUpReceived = new CountDownLatch(1);
2056         }
2057
2058         void waitForMemberRemoved() {
2059             assertEquals("MemberRemoved received", true,
2060                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2061             memberRemovedReceived = new CountDownLatch(1);
2062         }
2063
2064         void waitForUnreachableMember() {
2065             assertEquals("UnreachableMember received", true,
2066                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2067                 ));
2068             memberUnreachableReceived = new CountDownLatch(1);
2069         }
2070
2071         void waitForReachableMember() {
2072             assertEquals("ReachableMember received", true,
2073                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2074             memberReachableReceived = new CountDownLatch(1);
2075         }
2076
2077         void verifyFindPrimary() {
2078             assertEquals("FindPrimary received", true,
2079                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2080             findPrimaryMessageReceived = new CountDownLatch(1);
2081         }
2082
2083         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2084             return new Builder(datastoreContextBuilder);
2085         }
2086
2087         private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2088             private ActorRef shardActor;
2089             private final Map<String, ActorRef> shardActors = new HashMap<>();
2090
2091             Builder(DatastoreContext.Builder datastoreContextBuilder) {
2092                 super(TestShardManager.class);
2093                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2094             }
2095
2096             Builder shardActor(ActorRef shardActor) {
2097                 this.shardActor = shardActor;
2098                 return this;
2099             }
2100
2101             Builder addShardActor(String shardName, ActorRef actorRef){
2102                 shardActors.put(shardName, actorRef);
2103                 return this;
2104             }
2105         }
2106
2107         @Override
2108         public void saveSnapshot(Object obj) {
2109             snapshot = (ShardManagerSnapshot) obj;
2110             snapshotPersist.countDown();
2111             super.saveSnapshot(obj);
2112         }
2113
2114         void verifySnapshotPersisted(Set<String> shardList) {
2115             assertEquals("saveSnapshot invoked", true,
2116                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2117             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2118         }
2119
2120         @Override
2121         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2122             if(shardActors.get(info.getShardName()) != null){
2123                 return shardActors.get(info.getShardName());
2124             }
2125
2126             if(shardActor != null) {
2127                 return shardActor;
2128             }
2129
2130             return super.newShardActor(schemaContext, info);
2131         }
2132     }
2133
2134     private static abstract class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2135                                                      extends AbstractShardManagerCreator<T> {
2136         private final Class<C> shardManagerClass;
2137
2138         AbstractGenericCreator(Class<C> shardManagerClass) {
2139             this.shardManagerClass = shardManagerClass;
2140             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
2141                     waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2142         }
2143
2144         @Override
2145         public Props props() {
2146             verify();
2147             return Props.create(shardManagerClass, this);
2148         }
2149     }
2150
2151     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2152         GenericCreator(Class<C> shardManagerClass) {
2153             super(shardManagerClass);
2154         }
2155     }
2156
2157     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2158         private static final long serialVersionUID = 1L;
2159         private final Creator<ShardManager> delegate;
2160
2161         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2162             this.delegate = delegate;
2163         }
2164
2165         @Override
2166         public ShardManager create() throws Exception {
2167             return delegate.create();
2168         }
2169     }
2170
2171     interface MessageInterceptor extends Function<Object, Object> {
2172         boolean canIntercept(Object message);
2173     }
2174
2175     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2176         return new MessageInterceptor(){
2177             @Override
2178             public Object apply(Object message) {
2179                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2180             }
2181
2182             @Override
2183             public boolean canIntercept(Object message) {
2184                 return message instanceof FindPrimary;
2185             }
2186         };
2187     }
2188
2189     private static class MockRespondActor extends MessageCollectorActor {
2190         static final String CLEAR_RESPONSE = "clear-response";
2191         static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
2192
2193         private volatile Object responseMsg;
2194
2195         @SuppressWarnings("unused")
2196         public MockRespondActor() {
2197         }
2198
2199         @SuppressWarnings("unused")
2200         public MockRespondActor(Object responseMsg) {
2201             this.responseMsg = responseMsg;
2202         }
2203
2204         public void updateResponse(Object response) {
2205             responseMsg = response;
2206         }
2207
2208         @Override
2209         public void onReceive(Object message) throws Exception {
2210             if(!"get-all-messages".equals(message)) {
2211                 LOG.debug("Received message : {}", message);
2212             }
2213             super.onReceive(message);
2214             if (message instanceof AddServer && responseMsg != null) {
2215                 getSender().tell(responseMsg, getSelf());
2216             } else if(message instanceof RemoveServer && responseMsg != null){
2217                 getSender().tell(responseMsg, getSelf());
2218             } else if(message.equals(CLEAR_RESPONSE)) {
2219                 responseMsg = null;
2220             }
2221         }
2222     }
2223 }