BUG-5280: use MemberName instead of String
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import akka.actor.AddressFromURIString;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.cluster.Member;
31 import akka.dispatch.Dispatchers;
32 import akka.japi.Creator;
33 import akka.pattern.Patterns;
34 import akka.persistence.RecoveryCompleted;
35 import akka.serialization.Serialization;
36 import akka.testkit.JavaTestKit;
37 import akka.testkit.TestActorRef;
38 import akka.util.Timeout;
39 import com.google.common.base.Function;
40 import com.google.common.base.Stopwatch;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.typesafe.config.ConfigFactory;
47 import java.net.URI;
48 import java.util.AbstractMap;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Map.Entry;
57 import java.util.Set;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import java.util.stream.Collectors;
62 import org.apache.commons.lang3.SerializationUtils;
63 import org.junit.After;
64 import org.junit.Before;
65 import org.junit.Test;
66 import org.mockito.Mock;
67 import org.mockito.Mockito;
68 import org.mockito.MockitoAnnotations;
69 import org.opendaylight.controller.cluster.access.concepts.MemberName;
70 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
71 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
72 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
73 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
74 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
75 import org.opendaylight.controller.cluster.datastore.Shard;
76 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
77 import org.opendaylight.controller.cluster.datastore.config.Configuration;
78 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
79 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
80 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
81 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
82 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
83 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
84 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
85 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
86 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
87 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
88 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
89 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
90 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
91 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
92 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
93 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
94 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
95 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
96 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
97 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
98 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
99 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
100 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
102 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
103 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
104 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
105 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
106 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
107 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
108 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
109 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
110 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
111 import org.opendaylight.controller.cluster.raft.RaftState;
112 import org.opendaylight.controller.cluster.raft.TestActorFactory;
113 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
114 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
115 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
116 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
117 import org.opendaylight.controller.cluster.raft.messages.AddServer;
118 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
119 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
120 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
121 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
122 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
123 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
124 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
125 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
126 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
127 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
128 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
129 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
130 import org.slf4j.Logger;
131 import org.slf4j.LoggerFactory;
132 import scala.concurrent.Await;
133 import scala.concurrent.Future;
134 import scala.concurrent.duration.FiniteDuration;
135
136 public class ShardManagerTest extends AbstractActorTest {
137     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
138     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
139     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
140     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
141
142     private static int ID_COUNTER = 1;
143
144     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
145     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
146
147     @Mock
148     private static CountDownLatch ready;
149
150     private static TestActorRef<MessageCollectorActor> mockShardActor;
151
152     private static ShardIdentifier mockShardName;
153
154     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
155             dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
156                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
157
158     private final Collection<ActorSystem> actorSystems = new ArrayList<>();
159
160     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
161
162     @Before
163     public void setUp() {
164         MockitoAnnotations.initMocks(this);
165
166         InMemoryJournal.clear();
167         InMemorySnapshotStore.clear();
168
169         if(mockShardActor == null) {
170             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, MEMBER_1, "config");
171             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
172                     mockShardName.toString());
173         }
174
175         mockShardActor.underlyingActor().clear();
176     }
177
178     @After
179     public void tearDown() {
180         InMemoryJournal.clear();
181         InMemorySnapshotStore.clear();
182
183         for(ActorSystem system: actorSystems) {
184             JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
185         }
186
187         actorFactory.close();
188     }
189
190     private ActorSystem newActorSystem(String config) {
191         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
192         actorSystems.add(system);
193         return system;
194     }
195
196     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
197         String name = new ShardIdentifier(shardName, MemberName.forName(memberName), "config").toString();
198         if(system == getSystem()) {
199             return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
200         }
201
202         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
203     }
204
205     private Props newShardMgrProps() {
206         return newShardMgrProps(new MockConfiguration());
207     }
208
209     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
210         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
211         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
212         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
213         return mockFactory;
214     }
215
216     private TestShardManager.Builder newTestShardMgrBuilder() {
217         return TestShardManager.builder(datastoreContextBuilder);
218     }
219
220     private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
221         return TestShardManager.builder(datastoreContextBuilder).configuration(config);
222     }
223
224     private Props newShardMgrProps(Configuration config) {
225         return newTestShardMgrBuilder(config).props();
226     }
227
228     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
229         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
230     }
231
232     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
233         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
234     }
235
236
237     private Props newPropsShardMgrWithMockShardActor() {
238         return newTestShardMgrBuilderWithMockShardActor().props();
239     }
240
241     private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
242         return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
243     }
244
245
246     private TestShardManager newTestShardManager() {
247         return newTestShardManager(newShardMgrProps());
248     }
249
250     private TestShardManager newTestShardManager(Props props) {
251         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
252         TestShardManager shardManager = shardManagerActor.underlyingActor();
253         shardManager.waitForRecoveryComplete();
254         return shardManager;
255     }
256
257     private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
258         AssertionError last = null;
259         Stopwatch sw = Stopwatch.createStarted();
260         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
261             try {
262                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
263                 kit.expectMsgClass(LocalShardFound.class);
264                 return;
265             } catch(AssertionError e) {
266                 last = e;
267             }
268
269             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
270         }
271
272         throw last;
273     }
274
275     private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
276         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
277         if(reply instanceof Failure) {
278             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
279         }
280
281         return (T)reply;
282     }
283
284     @Test
285     public void testPerShardDatastoreContext() throws Exception {
286         LOG.info("testPerShardDatastoreContext starting");
287         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
288                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
289
290         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
291                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
292
293         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
294                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
295
296         final MockConfiguration mockConfig = new MockConfiguration() {
297             @Override
298             public Collection<String> getMemberShardNames(MemberName memberName) {
299                 return Arrays.asList("default", "topology");
300             }
301
302             @Override
303             public Collection<MemberName> getMembersFromShardName(String shardName) {
304                 return members("member-1");
305             }
306         };
307
308         final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
309                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
310         final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
311                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
312
313         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
314                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
315         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
316         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
317
318         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
319         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
320         class LocalShardManager extends ShardManager {
321             public LocalShardManager(AbstractShardManagerCreator<?> creator) {
322                 super(creator);
323             }
324
325             @Override
326             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
327                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
328                 ActorRef ref = null;
329                 if(entry != null) {
330                     ref = entry.getKey();
331                     entry.setValue(info.getDatastoreContext());
332                 }
333
334                 newShardActorLatch.countDown();
335                 return ref;
336             }
337         }
338
339         final Creator<ShardManager> creator = new Creator<ShardManager>() {
340             private static final long serialVersionUID = 1L;
341             @Override
342             public ShardManager create() throws Exception {
343                 return new LocalShardManager(new GenericCreator<LocalShardManager>(LocalShardManager.class).
344                         datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
345                         configuration(mockConfig));
346             }
347         };
348
349         JavaTestKit kit = new JavaTestKit(getSystem());
350
351         final ActorRef shardManager = actorFactory.createActor(Props.create(
352                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
353
354         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
355
356         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
357         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
358                 getShardElectionTimeoutFactor());
359         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
360                 getShardElectionTimeoutFactor());
361
362         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
363                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
364         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
365                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
366
367         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
368                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
369
370         shardManager.tell(newMockFactory, kit.getRef());
371
372         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
373         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
374
375         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
376         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
377
378         LOG.info("testPerShardDatastoreContext ending");
379     }
380
381     @Test
382     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
383         new JavaTestKit(getSystem()) {{
384             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
385
386             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
387
388             shardManager.tell(new FindPrimary("non-existent", false), getRef());
389
390             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
391         }};
392     }
393
394     @Test
395     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
396         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
397         new JavaTestKit(getSystem()) {{
398             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
399
400             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
401
402             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
403             shardManager.tell(new ActorInitialized(), mockShardActor);
404
405             DataTree mockDataTree = mock(DataTree.class);
406             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
407                     DataStoreVersions.CURRENT_VERSION), getRef());
408
409             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
410             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
411                     RaftState.Leader.name())), mockShardActor);
412
413             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
414
415             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
416             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
417                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
418             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
419         }};
420
421         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
422     }
423
424     @Test
425     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
426         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
427         new JavaTestKit(getSystem()) {{
428             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
429
430             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
431             shardManager.tell(new ActorInitialized(), mockShardActor);
432
433             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
434             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
435             shardManager.tell(new RoleChangeNotification(memberId1,
436                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
437             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
438
439             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
440
441             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
442         }};
443
444         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
445     }
446
447     @Test
448     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
449         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
450         new JavaTestKit(getSystem()) {{
451             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
452
453             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
454             shardManager.tell(new ActorInitialized(), mockShardActor);
455
456             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
457             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
458
459             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
460             shardManager.tell(new RoleChangeNotification(memberId1,
461                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
462             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
463             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
464
465             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
466
467             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
468             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
469                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
470             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
471         }};
472
473         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
474     }
475
476     @Test
477     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
478         new JavaTestKit(getSystem()) {{
479             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
480
481             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
482
483             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
484         }};
485     }
486
487     @Test
488     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
489         new JavaTestKit(getSystem()) {{
490             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
491
492             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
493             shardManager.tell(new ActorInitialized(), mockShardActor);
494
495             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
496
497             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
498         }};
499     }
500
501     @Test
502     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
503         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
504         new JavaTestKit(getSystem()) {{
505             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
506
507             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
508             shardManager.tell(new ActorInitialized(), mockShardActor);
509
510             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
511             shardManager.tell(new RoleChangeNotification(memberId,
512                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
513
514             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
515
516             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
517
518             DataTree mockDataTree = mock(DataTree.class);
519             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
520                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
521
522             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
523
524             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
525             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
526                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
527             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
528         }};
529
530         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
531     }
532
533     @Test
534     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
535         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
536         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
537         new JavaTestKit(getSystem()) {{
538             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
539
540             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
541
542             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
543             // delayed until we send ActorInitialized and RoleChangeNotification.
544             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
545
546             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
547
548             shardManager.tell(new ActorInitialized(), mockShardActor);
549
550             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
551
552             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
553             shardManager.tell(new RoleChangeNotification(memberId,
554                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
555
556             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
557
558             DataTree mockDataTree = mock(DataTree.class);
559             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
560                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
561
562             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
563             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
564                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
565             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
566
567             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
568         }};
569
570         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
571     }
572
573     @Test
574     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
575         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
576         new JavaTestKit(getSystem()) {{
577             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
578
579             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
580
581             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
582
583             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
584
585             shardManager.tell(new ActorInitialized(), mockShardActor);
586
587             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
588         }};
589
590         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
591     }
592
593     @Test
594     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
595         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
596         new JavaTestKit(getSystem()) {{
597             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
598
599             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
600             shardManager.tell(new ActorInitialized(), mockShardActor);
601             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
602                     null, RaftState.Candidate.name()), mockShardActor);
603
604             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
605
606             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
607         }};
608
609         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
610     }
611
612     @Test
613     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
614         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
615         new JavaTestKit(getSystem()) {{
616             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
617
618             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
619             shardManager.tell(new ActorInitialized(), mockShardActor);
620             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
621                     null, RaftState.IsolatedLeader.name()), mockShardActor);
622
623             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
624
625             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
626         }};
627
628         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
629     }
630
631     @Test
632     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
633         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
634         new JavaTestKit(getSystem()) {{
635             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
636
637             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
638             shardManager.tell(new ActorInitialized(), mockShardActor);
639
640             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
641
642             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
643         }};
644
645         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
646     }
647
648     @Test
649     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
650         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
651         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
652
653         // Create an ActorSystem ShardManager actor for member-1.
654
655         final ActorSystem system1 = newActorSystem("Member1");
656         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
657
658         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
659                 newTestShardMgrBuilderWithMockShardActor().cluster(
660                         new ClusterWrapperImpl(system1)).props().withDispatcher(
661                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
662
663         // Create an ActorSystem ShardManager actor for member-2.
664
665         final ActorSystem system2 = newActorSystem("Member2");
666
667         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
668
669         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
670
671         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
672                 put("default", Arrays.asList("member-1", "member-2")).
673                 put("astronauts", Arrays.asList("member-2")).build());
674
675         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
676                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
677                         new ClusterWrapperImpl(system2)).props().withDispatcher(
678                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
679
680         new JavaTestKit(system1) {{
681
682             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
683             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
684
685             shardManager2.tell(new ActorInitialized(), mockShardActor2);
686
687             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
688             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
689             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
690                     mock(DataTree.class), leaderVersion), mockShardActor2);
691             shardManager2.tell(new RoleChangeNotification(memberId2,
692                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
693
694             shardManager1.underlyingActor().waitForMemberUp();
695             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
696
697             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
698             String path = found.getPrimaryPath();
699             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
700             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
701
702             shardManager2.underlyingActor().verifyFindPrimary();
703
704             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
705
706             shardManager1.underlyingActor().waitForMemberRemoved();
707
708             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
709
710             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
711         }};
712
713         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
714     }
715
716     @Test
717     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
718         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
719         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
720
721         // Create an ActorSystem ShardManager actor for member-1.
722
723         final ActorSystem system1 = newActorSystem("Member1");
724         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
725
726         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
727
728         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
729                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
730                         new ClusterWrapperImpl(system1)).props().withDispatcher(
731                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
732
733         // Create an ActorSystem ShardManager actor for member-2.
734
735         final ActorSystem system2 = newActorSystem("Member2");
736
737         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
738
739         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
740
741         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
742             put("default", Arrays.asList("member-1", "member-2")).build());
743
744         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
745                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
746                         new ClusterWrapperImpl(system2)).props().withDispatcher(
747                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
748
749         new JavaTestKit(system1) {{
750
751             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
752             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
753             shardManager1.tell(new ActorInitialized(), mockShardActor1);
754             shardManager2.tell(new ActorInitialized(), mockShardActor2);
755
756             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
757             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
758             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
759                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
760             shardManager1.tell(new RoleChangeNotification(memberId1,
761                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
762             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
763                     DataStoreVersions.CURRENT_VERSION),
764                 mockShardActor2);
765             shardManager2.tell(new RoleChangeNotification(memberId2,
766                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
767             shardManager1.underlyingActor().waitForMemberUp();
768
769             shardManager1.tell(new FindPrimary("default", true), getRef());
770
771             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
772             String path = found.getPrimaryPath();
773             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
774
775             shardManager1.tell(MockClusterWrapper.
776                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
777
778             shardManager1.underlyingActor().waitForUnreachableMember();
779
780             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
781             assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
782             MessageCollectorActor.clearMessages(mockShardActor1);
783
784             shardManager1.tell(MockClusterWrapper.
785                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
786
787             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
788
789             shardManager1.tell(new FindPrimary("default", true), getRef());
790
791             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
792
793             shardManager1.tell(MockClusterWrapper.
794                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
795
796             shardManager1.underlyingActor().waitForReachableMember();
797
798             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
799             assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
800             MessageCollectorActor.clearMessages(mockShardActor1);
801
802             shardManager1.tell(new FindPrimary("default", true), getRef());
803
804             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
805             String path1 = found1.getPrimaryPath();
806             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
807
808             shardManager1.tell(MockClusterWrapper.
809                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
810
811             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
812
813             // Test FindPrimary wait succeeds after reachable member event.
814
815             shardManager1.tell(MockClusterWrapper.
816                     createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
817             shardManager1.underlyingActor().waitForUnreachableMember();
818
819             shardManager1.tell(new FindPrimary("default", true), getRef());
820
821             shardManager1.tell(MockClusterWrapper.
822                     createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
823
824             RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
825             String path2 = found2.getPrimaryPath();
826             assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
827         }};
828
829         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
830     }
831
832     @Test
833     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
834         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
835         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
836
837         // Create an ActorSystem ShardManager actor for member-1.
838
839         final ActorSystem system1 = newActorSystem("Member1");
840         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
841
842         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
843
844         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
845         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
846                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
847                         new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
848                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
849
850         // Create an ActorSystem ShardManager actor for member-2.
851
852         final ActorSystem system2 = newActorSystem("Member2");
853
854         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
855
856         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
857
858         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
859             put("default", Arrays.asList("member-1", "member-2")).build());
860
861         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
862                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
863                         new ClusterWrapperImpl(system2)).props().withDispatcher(
864                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
865
866         new JavaTestKit(system1) {{
867             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
868             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
869             shardManager1.tell(new ActorInitialized(), mockShardActor1);
870             shardManager2.tell(new ActorInitialized(), mockShardActor2);
871
872             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
873             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
874             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
875                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
876             shardManager1.tell(new RoleChangeNotification(memberId1,
877                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
878             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
879                     DataStoreVersions.CURRENT_VERSION),
880                 mockShardActor2);
881             shardManager2.tell(new RoleChangeNotification(memberId2,
882                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
883             shardManager1.underlyingActor().waitForMemberUp();
884
885             shardManager1.tell(new FindPrimary("default", true), getRef());
886
887             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
888             String path = found.getPrimaryPath();
889             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
890
891             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
892                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
893
894             shardManager1.tell(MockClusterWrapper.
895                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
896
897             shardManager1.underlyingActor().waitForUnreachableMember();
898
899             shardManager1.tell(new FindPrimary("default", true), getRef());
900
901             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
902
903             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
904
905             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
906                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
907             shardManager1.tell(new RoleChangeNotification(memberId1,
908                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
909
910             shardManager1.tell(new FindPrimary("default", true), getRef());
911
912             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
913             String path1 = found1.getPrimaryPath();
914             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
915
916         }};
917
918         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
919     }
920
921
922     @Test
923     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
924         new JavaTestKit(getSystem()) {{
925             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
926
927             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
928
929             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
930
931             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
932
933             assertEquals("getShardName", "non-existent", notFound.getShardName());
934         }};
935     }
936
937     @Test
938     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
939         new JavaTestKit(getSystem()) {{
940             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
941
942             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
943             shardManager.tell(new ActorInitialized(), mockShardActor);
944
945             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
946
947             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
948
949             assertTrue("Found path contains " + found.getPath().path().toString(),
950                     found.getPath().path().toString().contains("member-1-shard-default-config"));
951         }};
952     }
953
954     @Test
955     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
956         new JavaTestKit(getSystem()) {{
957             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
958
959             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
960
961             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
962         }};
963     }
964
965     @Test
966     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
967         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
968         new JavaTestKit(getSystem()) {{
969             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
970
971             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
972
973             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
974             // delayed until we send ActorInitialized.
975             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
976                     new Timeout(5, TimeUnit.SECONDS));
977
978             shardManager.tell(new ActorInitialized(), mockShardActor);
979
980             Object resp = Await.result(future, duration("5 seconds"));
981             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
982         }};
983
984         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
985     }
986
987     @Test
988     public void testOnRecoveryJournalIsCleaned() {
989         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
990         InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
991         InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
992         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
993
994         newTestShardManager();
995
996         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
997
998         // Journal entries up to the last one should've been deleted
999         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
1000         synchronized (journal) {
1001             assertEquals("Journal size", 0, journal.size());
1002         }
1003     }
1004
1005     @Test
1006     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1007         TestShardManager shardManager = newTestShardManager();
1008
1009         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1010         shardManager.onReceiveCommand(new RoleChangeNotification(
1011                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1012
1013         verify(ready, never()).countDown();
1014
1015         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1016                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1017
1018         verify(ready, times(1)).countDown();
1019     }
1020
1021     @Test
1022     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1023         new JavaTestKit(getSystem()) {{
1024             TestShardManager shardManager = newTestShardManager();
1025
1026             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1027             shardManager.onReceiveCommand(new RoleChangeNotification(
1028                     memberId, null, RaftState.Follower.name()));
1029
1030             verify(ready, never()).countDown();
1031
1032             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1033
1034             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1035                     "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1036                     DataStoreVersions.CURRENT_VERSION));
1037
1038             verify(ready, times(1)).countDown();
1039         }};
1040     }
1041
1042     @Test
1043     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1044         new JavaTestKit(getSystem()) {{
1045             TestShardManager shardManager = newTestShardManager();
1046
1047             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1048             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1049
1050             verify(ready, never()).countDown();
1051
1052             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1053                     "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1054                     DataStoreVersions.CURRENT_VERSION));
1055
1056             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1057
1058             verify(ready, times(1)).countDown();
1059         }};
1060     }
1061
1062     @Test
1063     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1064         TestShardManager shardManager = newTestShardManager();
1065
1066         shardManager.onReceiveCommand(new RoleChangeNotification(
1067                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1068
1069         verify(ready, never()).countDown();
1070     }
1071
1072     @Test
1073     public void testByDefaultSyncStatusIsFalse() throws Exception{
1074         TestShardManager shardManager = newTestShardManager();
1075
1076         assertEquals(false, shardManager.getMBean().getSyncStatus());
1077     }
1078
1079     @Test
1080     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1081         TestShardManager shardManager = newTestShardManager();
1082
1083         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1084                 RaftState.Follower.name(), RaftState.Leader.name()));
1085
1086         assertEquals(true, shardManager.getMBean().getSyncStatus());
1087     }
1088
1089     @Test
1090     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1091         TestShardManager shardManager = newTestShardManager();
1092
1093         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1094         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1095                 RaftState.Follower.name(), RaftState.Candidate.name()));
1096
1097         assertEquals(false, shardManager.getMBean().getSyncStatus());
1098
1099         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1100         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1101                 true, shardId));
1102
1103         assertEquals(false, shardManager.getMBean().getSyncStatus());
1104     }
1105
1106     @Test
1107     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1108         TestShardManager shardManager = newTestShardManager();
1109
1110         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1111         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1112                 RaftState.Candidate.name(), RaftState.Follower.name()));
1113
1114         // Initially will be false
1115         assertEquals(false, shardManager.getMBean().getSyncStatus());
1116
1117         // Send status true will make sync status true
1118         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1119
1120         assertEquals(true, shardManager.getMBean().getSyncStatus());
1121
1122         // Send status false will make sync status false
1123         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1124
1125         assertEquals(false, shardManager.getMBean().getSyncStatus());
1126
1127     }
1128
1129     @Test
1130     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1131         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1132         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1133             @Override
1134             public List<String> getMemberShardNames(MemberName memberName) {
1135                 return Arrays.asList("default", "astronauts");
1136             }
1137         }));
1138
1139         // Initially will be false
1140         assertEquals(false, shardManager.getMBean().getSyncStatus());
1141
1142         // Make default shard leader
1143         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1144         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1145                 RaftState.Follower.name(), RaftState.Leader.name()));
1146
1147         // default = Leader, astronauts is unknown so sync status remains false
1148         assertEquals(false, shardManager.getMBean().getSyncStatus());
1149
1150         // Make astronauts shard leader as well
1151         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1152         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1153                 RaftState.Follower.name(), RaftState.Leader.name()));
1154
1155         // Now sync status should be true
1156         assertEquals(true, shardManager.getMBean().getSyncStatus());
1157
1158         // Make astronauts a Follower
1159         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1160                 RaftState.Leader.name(), RaftState.Follower.name()));
1161
1162         // Sync status is not true
1163         assertEquals(false, shardManager.getMBean().getSyncStatus());
1164
1165         // Make the astronauts follower sync status true
1166         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1167
1168         // Sync status is now true
1169         assertEquals(true, shardManager.getMBean().getSyncStatus());
1170
1171         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1172     }
1173
1174     @Test
1175     public void testOnReceiveSwitchShardBehavior() throws Exception {
1176         new JavaTestKit(getSystem()) {{
1177             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1178
1179             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1180             shardManager.tell(new ActorInitialized(), mockShardActor);
1181
1182             shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1183
1184             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1185
1186             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1187             assertEquals(1000, switchBehavior.getNewTerm());
1188         }};
1189     }
1190
1191     private static List<MemberName> members(String... names) {
1192         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1193     }
1194
1195     @Test
1196     public void testOnCreateShard() {
1197         LOG.info("testOnCreateShard starting");
1198         new JavaTestKit(getSystem()) {{
1199             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1200
1201             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1202                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1203
1204             SchemaContext schemaContext = TestModel.createTestContext();
1205             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1206
1207             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1208                     persistent(false).build();
1209             Shard.Builder shardBuilder = Shard.builder();
1210
1211             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1212                     "foo", null, members("member-1", "member-5", "member-6"));
1213             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1214
1215             expectMsgClass(duration("5 seconds"), Success.class);
1216
1217             shardManager.tell(new FindLocalShard("foo", true), getRef());
1218
1219             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1220
1221             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1222             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1223                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1224             assertEquals("peerMembers", Sets.newHashSet(
1225                 new ShardIdentifier("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1226                 new ShardIdentifier("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1227                 shardBuilder.getPeerAddresses().keySet());
1228             assertEquals("ShardIdentifier", new ShardIdentifier("foo", MEMBER_1, shardMrgIDSuffix),
1229                     shardBuilder.getId());
1230             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1231
1232             // Send CreateShard with same name - should return Success with a message.
1233
1234             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1235
1236             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1237             assertNotNull("Success status is null", success.status());
1238         }};
1239
1240         LOG.info("testOnCreateShard ending");
1241     }
1242
1243     @Test
1244     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1245         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1246         new JavaTestKit(getSystem()) {{
1247             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1248
1249             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1250                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1251
1252             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1253
1254             Shard.Builder shardBuilder = Shard.builder();
1255             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1256                     "foo", null, members("member-5", "member-6"));
1257
1258             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1259             expectMsgClass(duration("5 seconds"), Success.class);
1260
1261             shardManager.tell(new FindLocalShard("foo", true), getRef());
1262             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1263
1264             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1265             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1266                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1267         }};
1268
1269         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1270     }
1271
1272     @Test
1273     public void testOnCreateShardWithNoInitialSchemaContext() {
1274         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1275         new JavaTestKit(getSystem()) {{
1276             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1277                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1278
1279             Shard.Builder shardBuilder = Shard.builder();
1280
1281             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1282                     "foo", null, members("member-1"));
1283             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1284
1285             expectMsgClass(duration("5 seconds"), Success.class);
1286
1287             SchemaContext schemaContext = TestModel.createTestContext();
1288             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1289
1290             shardManager.tell(new FindLocalShard("foo", true), getRef());
1291
1292             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1293
1294             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1295             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1296         }};
1297
1298         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1299     }
1300
1301     @Test
1302     public void testGetSnapshot() throws Throwable {
1303         LOG.info("testGetSnapshot starting");
1304         JavaTestKit kit = new JavaTestKit(getSystem());
1305
1306         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1307                    put("shard1", Arrays.asList("member-1")).
1308                    put("shard2", Arrays.asList("member-1")).
1309                    put("astronauts", Collections.<String>emptyList()).build());
1310
1311         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1312                 withDispatcher(Dispatchers.DefaultDispatcherId()));
1313
1314         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1315         Failure failure = kit.expectMsgClass(Failure.class);
1316         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1317
1318         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1319
1320         waitForShardInitialized(shardManager, "shard1", kit);
1321         waitForShardInitialized(shardManager, "shard2", kit);
1322
1323         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1324
1325         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1326
1327         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1328         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1329
1330         Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
1331
1332         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1333                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1334
1335         // Add a new replica
1336
1337         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1338
1339         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1340         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1341
1342         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1343         mockShardLeaderKit.expectMsgClass(AddServer.class);
1344         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1345         kit.expectMsgClass(Status.Success.class);
1346         waitForShardInitialized(shardManager, "astronauts", kit);
1347
1348         // Send another GetSnapshot and verify
1349
1350         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1351         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1352
1353         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1354                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1355
1356         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1357         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1358         ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1359         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1360                 Sets.newHashSet(snapshot.getShardList()));
1361
1362         LOG.info("testGetSnapshot ending");
1363     }
1364
1365     @Test
1366     public void testRestoreFromSnapshot() throws Throwable {
1367         LOG.info("testRestoreFromSnapshot starting");
1368
1369         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1370
1371         JavaTestKit kit = new JavaTestKit(getSystem());
1372
1373         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1374                    put("shard1", Collections.<String>emptyList()).
1375                    put("shard2", Collections.<String>emptyList()).
1376                    put("astronauts", Collections.<String>emptyList()).build());
1377
1378
1379         ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1380         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1381                 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1382         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1383                 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1384
1385         shardManager.underlyingActor().waitForRecoveryComplete();
1386
1387         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1388
1389         waitForShardInitialized(shardManager, "shard1", kit);
1390         waitForShardInitialized(shardManager, "shard2", kit);
1391         waitForShardInitialized(shardManager, "astronauts", kit);
1392
1393         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1394
1395         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1396
1397         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1398
1399         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1400         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1401         snapshot = SerializationUtils.deserialize(snapshotBytes);
1402         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1403                 Sets.newHashSet(snapshot.getShardList()));
1404
1405         LOG.info("testRestoreFromSnapshot ending");
1406     }
1407
1408     @Test
1409     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1410         new JavaTestKit(getSystem()) {{
1411             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1412                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1413
1414             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1415             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1416
1417             assertEquals("Failure obtained", true,
1418                           (resp.cause() instanceof IllegalArgumentException));
1419         }};
1420     }
1421
1422     @Test
1423     public void testAddShardReplica() throws Exception {
1424         LOG.info("testAddShardReplica starting");
1425         MockConfiguration mockConfig =
1426                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1427                    put("default", Arrays.asList("member-1", "member-2")).
1428                    put("astronauts", Arrays.asList("member-2")).build());
1429
1430         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1431         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1432
1433         // Create an ActorSystem ShardManager actor for member-1.
1434         final ActorSystem system1 = newActorSystem("Member1");
1435         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1436         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1437         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1438                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1439                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1440
1441         // Create an ActorSystem ShardManager actor for member-2.
1442         final ActorSystem system2 = newActorSystem("Member2");
1443         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1444
1445         String name = new ShardIdentifier("astronauts", MEMBER_2, "config").toString();
1446         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1447                 TestActorRef.create(system2, Props.create(MockRespondActor.class).
1448                         withDispatcher(Dispatchers.DefaultDispatcherId()), name);
1449         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1450                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1451                         new ClusterWrapperImpl(system2)).props().
1452                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1453
1454         new JavaTestKit(system1) {{
1455
1456             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1457             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1458
1459             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1460
1461             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1462             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1463             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1464                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1465             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1466                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1467
1468             newReplicaShardManager.underlyingActor().waitForMemberUp();
1469             leaderShardManager.underlyingActor().waitForMemberUp();
1470
1471             //Have a dummy snapshot to be overwritten by the new data persisted.
1472             String[] restoredShards = {"default", "people"};
1473             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1474             InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1475             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1476
1477             InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1478             InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1479
1480             //construct a mock response message
1481             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1482             mockShardLeaderActor.underlyingActor().updateResponse(response);
1483             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1484             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1485                 AddServer.class);
1486             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1487             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1488             expectMsgClass(duration("5 seconds"), Status.Success.class);
1489
1490             InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1491             InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1492             List<ShardManagerSnapshot> persistedSnapshots =
1493                 InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
1494             assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1495             ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1496             assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1497                     Sets.newHashSet(shardManagerSnapshot.getShardList()));
1498         }};
1499         LOG.info("testAddShardReplica ending");
1500     }
1501
1502     @Test
1503     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1504         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1505         new JavaTestKit(getSystem()) {{
1506             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1507                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1508
1509             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1510             shardManager.tell(new ActorInitialized(), mockShardActor);
1511
1512             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1513             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1514             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1515                     Props.create(MockRespondActor.class, addServerReply), leaderId);
1516
1517             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1518
1519             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1520             shardManager.tell(new RoleChangeNotification(newReplicaId,
1521                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1522             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId,
1523                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1524
1525             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1526
1527             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1528
1529             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1530             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1531
1532             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1533             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1534
1535             // Send message again to verify previous in progress state is cleared
1536
1537             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1538             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1539             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1540
1541             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1542
1543             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1544                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1545             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1546             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1547             expectMsgClass(duration("5 seconds"), Failure.class);
1548
1549             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1550             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1551         }};
1552
1553         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1554     }
1555
1556     @Test
1557     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1558         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1559         new JavaTestKit(getSystem()) {{
1560             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1561             ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1562
1563             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1564             shardManager.tell(new ActorInitialized(), mockShardActor);
1565             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1566                     DataStoreVersions.CURRENT_VERSION), getRef());
1567             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1568                     RaftState.Leader.name())), mockShardActor);
1569
1570             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1571             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1572             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1573
1574             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1575             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1576         }};
1577
1578         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1579     }
1580
1581     @Test
1582     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1583         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1584         new JavaTestKit(getSystem()) {{
1585             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1586
1587             MockConfiguration mockConfig =
1588                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1589                        put("astronauts", Arrays.asList("member-2")).build());
1590
1591             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1592             final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1593                     newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1594             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1595
1596             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1597
1598             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1599             terminateWatcher.watch(mockNewReplicaShardActor);
1600
1601             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1602
1603             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1604             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1605                     addServerMsg.getNewServerId());
1606             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1607
1608             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1609             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1610
1611             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1612             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1613
1614             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1615
1616             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1617             mockShardLeaderKit.expectMsgClass(AddServer.class);
1618             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1619             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1620             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1621         }};
1622
1623         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1624     }
1625
1626     @Test
1627     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1628         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1629                 AddServer.class, new AddShardReplica("astronauts"));
1630     }
1631
1632     @Test
1633     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1634         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1635         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1636         new JavaTestKit(getSystem()) {{
1637             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1638                        put("astronauts", Arrays.asList("member-2")).build());
1639
1640             final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1641                     shardActor(mockShardActor).props(), shardMgrID);
1642
1643             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1644             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1645                     AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
1646
1647             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1648             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1649             assertEquals("Failure obtained", true,
1650                           (resp.cause() instanceof RuntimeException));
1651         }};
1652
1653         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1654     }
1655
1656     @Test
1657     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1658         new JavaTestKit(getSystem()) {{
1659             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1660                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1661
1662             shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1663             Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1664             assertEquals("Failure obtained", true,
1665                          (resp.cause() instanceof PrimaryNotFoundException));
1666         }};
1667     }
1668
1669     @Test
1670     /**
1671      * Primary is Local
1672      */
1673     public void testRemoveShardReplicaLocal() throws Exception {
1674         new JavaTestKit(getSystem()) {{
1675             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1676
1677             final TestActorRef<MockRespondActor> respondActor =
1678                     TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
1679
1680             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1681
1682             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1683             shardManager.tell(new ActorInitialized(), respondActor);
1684             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1685                     DataStoreVersions.CURRENT_VERSION), getRef());
1686             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1687                     RaftState.Leader.name())), respondActor);
1688
1689             respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
1690             shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1691             final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
1692             assertEquals(new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(),
1693                     removeServer.getServerId());
1694             expectMsgClass(duration("5 seconds"), Success.class);
1695         }};
1696     }
1697
1698     @Test
1699     public void testRemoveShardReplicaRemote() throws Exception {
1700         MockConfiguration mockConfig =
1701                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1702                         put("default", Arrays.asList("member-1", "member-2")).
1703                         put("astronauts", Arrays.asList("member-1")).build());
1704
1705         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1706
1707         // Create an ActorSystem ShardManager actor for member-1.
1708         final ActorSystem system1 = newActorSystem("Member1");
1709         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1710         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1711
1712         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1713                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1714                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1715                 shardManagerID);
1716
1717         // Create an ActorSystem ShardManager actor for member-2.
1718         final ActorSystem system2 = newActorSystem("Member2");
1719         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1720
1721         String name = new ShardIdentifier("default", MEMBER_2, shardMrgIDSuffix).toString();
1722         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1723                 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1724
1725         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1726
1727         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1728                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1729                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1730                 shardManagerID);
1731
1732         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1733         //    akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1734         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1735         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1736         // look like so,
1737         //    akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1738         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1739         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1740         // dead letters.
1741         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1742         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1743         // thing works as expected
1744         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1745                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix);
1746
1747         LOG.error("Forwarding actor : {}", actorRef);
1748
1749         new JavaTestKit(system1) {{
1750
1751             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1752             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1753
1754             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1755             newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1756
1757             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1758             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1759             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1760                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1761             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1762                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1763
1764             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1765             newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
1766                     mock(DataTree.class), leaderVersion), mockShardActor);
1767             newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
1768                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1769
1770             newReplicaShardManager.underlyingActor().waitForMemberUp();
1771             leaderShardManager.underlyingActor().waitForMemberUp();
1772
1773             //construct a mock response message
1774             RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
1775             mockShardLeaderActor.underlyingActor().updateResponse(response);
1776             newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1777             RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1778                     RemoveServer.class);
1779             String removeServerId = new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString();
1780             assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1781             expectMsgClass(duration("5 seconds"), Status.Success.class);
1782         }};
1783
1784     }
1785
1786     @Test
1787     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1788         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1789                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1790     }
1791
1792     @Test
1793     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1794         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1795                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1796     }
1797
1798
1799     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1800                                                       final Class<?> firstForwardedServerChangeClass,
1801                                                       final Object secondServerChange) throws Exception {
1802         new JavaTestKit(getSystem()) {{
1803             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1804             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1805
1806             MockConfiguration mockConfig =
1807                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1808                             put(shardName, Arrays.asList("member-2")).build());
1809
1810             final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1811                     newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
1812                             new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1813                     shardMgrID);
1814
1815             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1816
1817             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1818
1819             shardManager.tell(firstServerChange, getRef());
1820
1821             mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1822
1823             shardManager.tell(secondServerChange, secondRequestKit.getRef());
1824
1825             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1826         }};
1827     }
1828
1829     @Test
1830     public void testServerRemovedShardActorNotRunning() throws Exception {
1831         LOG.info("testServerRemovedShardActorNotRunning starting");
1832         new JavaTestKit(getSystem()) {{
1833             MockConfiguration mockConfig =
1834                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1835                             put("default", Arrays.asList("member-1", "member-2")).
1836                             put("astronauts", Arrays.asList("member-2")).
1837                             put("people", Arrays.asList("member-1", "member-2")).build());
1838
1839             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1840
1841             shardManager.underlyingActor().waitForRecoveryComplete();
1842             shardManager.tell(new FindLocalShard("people", false), getRef());
1843             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1844
1845             shardManager.tell(new FindLocalShard("default", false), getRef());
1846             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1847
1848             // Removed the default shard replica from member-1
1849             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1850             ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build();
1851             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1852
1853             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1854         }};
1855
1856         LOG.info("testServerRemovedShardActorNotRunning ending");
1857     }
1858
1859     @Test
1860     public void testServerRemovedShardActorRunning() throws Exception {
1861         LOG.info("testServerRemovedShardActorRunning starting");
1862         new JavaTestKit(getSystem()) {{
1863             MockConfiguration mockConfig =
1864                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1865                             put("default", Arrays.asList("member-1", "member-2")).
1866                             put("astronauts", Arrays.asList("member-2")).
1867                             put("people", Arrays.asList("member-1", "member-2")).build());
1868
1869             String shardId = ShardIdentifier.builder().shardName("default").memberName(MEMBER_1).
1870                     type(shardMrgIDSuffix).build().toString();
1871             TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1872                     MessageCollectorActor.props(), shardId);
1873
1874             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1875                     newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1876
1877             shardManager.underlyingActor().waitForRecoveryComplete();
1878
1879             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1880             shardManager.tell(new ActorInitialized(), shard);
1881
1882             waitForShardInitialized(shardManager, "people", this);
1883             waitForShardInitialized(shardManager, "default", this);
1884
1885             // Removed the default shard replica from member-1
1886             shardManager.tell(new ServerRemoved(shardId), getRef());
1887
1888             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1889
1890             MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1891         }};
1892
1893         LOG.info("testServerRemovedShardActorRunning ending");
1894     }
1895
1896
1897     @Test
1898     public void testShardPersistenceWithRestoredData() throws Exception {
1899         LOG.info("testShardPersistenceWithRestoredData starting");
1900         new JavaTestKit(getSystem()) {{
1901             MockConfiguration mockConfig =
1902                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1903                    put("default", Arrays.asList("member-1", "member-2")).
1904                    put("astronauts", Arrays.asList("member-2")).
1905                    put("people", Arrays.asList("member-1", "member-2")).build());
1906             String[] restoredShards = {"default", "astronauts"};
1907             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1908             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1909
1910             //create shardManager to come up with restored data
1911             TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1912                     newShardMgrProps(mockConfig));
1913
1914             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1915
1916             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1917             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1918             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1919
1920             //Verify a local shard is created for the restored shards,
1921             //although we expect a NotInitializedException for the shards as the actor initialization
1922             //message is not sent for them
1923             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1924             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1925
1926             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1927             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1928         }};
1929
1930         LOG.info("testShardPersistenceWithRestoredData ending");
1931     }
1932
1933     @Test
1934     public void testShutDown() throws Exception {
1935         LOG.info("testShutDown starting");
1936         new JavaTestKit(getSystem()) {{
1937             MockConfiguration mockConfig =
1938                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1939                             put("shard1", Arrays.asList("member-1")).
1940                             put("shard2", Arrays.asList("member-1")).build());
1941
1942             String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName(MEMBER_1).
1943                     type(shardMrgIDSuffix).build().toString();
1944             TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
1945                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
1946
1947             String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName(MEMBER_1).
1948                     type(shardMrgIDSuffix).build().toString();
1949             TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
1950                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
1951
1952             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
1953                     mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
1954                         withDispatcher(Dispatchers.DefaultDispatcherId()));
1955
1956             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1957             shardManager.tell(new ActorInitialized(), shard1);
1958             shardManager.tell(new ActorInitialized(), shard2);
1959
1960             FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1961             Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1962
1963             MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1964             MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1965
1966             try {
1967                 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1968                 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1969             } catch(TimeoutException e) {
1970                 // expected
1971             }
1972
1973             actorFactory.killActor(shard1, this);
1974             actorFactory.killActor(shard2, this);
1975
1976             Boolean stopped = Await.result(stopFuture, duration);
1977             assertEquals("Stopped", Boolean.TRUE, stopped);
1978         }};
1979
1980         LOG.info("testShutDown ending");
1981     }
1982
1983     private static class TestShardManager extends ShardManager {
1984         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1985         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1986         private ShardManagerSnapshot snapshot;
1987         private final Map<String, ActorRef> shardActors;
1988         private final ActorRef shardActor;
1989         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1990         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1991         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1992         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1993         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1994         private volatile MessageInterceptor messageInterceptor;
1995
1996         private TestShardManager(Builder builder) {
1997             super(builder);
1998             shardActor = builder.shardActor;
1999             shardActors = builder.shardActors;
2000         }
2001
2002         @Override
2003         protected void handleRecover(Object message) throws Exception {
2004             try {
2005                 super.handleRecover(message);
2006             } finally {
2007                 if(message instanceof RecoveryCompleted) {
2008                     recoveryComplete.countDown();
2009                 }
2010             }
2011         }
2012
2013         private void countDownIfOther(final Member member, CountDownLatch latch) {
2014             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2015                 latch.countDown();
2016             }
2017         }
2018
2019         @Override
2020         public void handleCommand(Object message) throws Exception {
2021             try{
2022                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2023                     getSender().tell(messageInterceptor.apply(message), getSelf());
2024                 } else {
2025                     super.handleCommand(message);
2026                 }
2027             } finally {
2028                 if(message instanceof FindPrimary) {
2029                     findPrimaryMessageReceived.countDown();
2030                 } else if(message instanceof ClusterEvent.MemberUp) {
2031                     countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived);
2032                 } else if(message instanceof ClusterEvent.MemberRemoved) {
2033                     countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived);
2034                 } else if(message instanceof ClusterEvent.UnreachableMember) {
2035                     countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived);
2036                 } else if(message instanceof ClusterEvent.ReachableMember) {
2037                     countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived);
2038                 }
2039             }
2040         }
2041
2042         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2043             this.messageInterceptor = messageInterceptor;
2044         }
2045
2046         void waitForRecoveryComplete() {
2047             assertEquals("Recovery complete", true,
2048                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2049         }
2050
2051         void waitForMemberUp() {
2052             assertEquals("MemberUp received", true,
2053                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2054             memberUpReceived = new CountDownLatch(1);
2055         }
2056
2057         void waitForMemberRemoved() {
2058             assertEquals("MemberRemoved received", true,
2059                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2060             memberRemovedReceived = new CountDownLatch(1);
2061         }
2062
2063         void waitForUnreachableMember() {
2064             assertEquals("UnreachableMember received", true,
2065                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2066                 ));
2067             memberUnreachableReceived = new CountDownLatch(1);
2068         }
2069
2070         void waitForReachableMember() {
2071             assertEquals("ReachableMember received", true,
2072                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2073             memberReachableReceived = new CountDownLatch(1);
2074         }
2075
2076         void verifyFindPrimary() {
2077             assertEquals("FindPrimary received", true,
2078                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2079             findPrimaryMessageReceived = new CountDownLatch(1);
2080         }
2081
2082         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2083             return new Builder(datastoreContextBuilder);
2084         }
2085
2086         private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2087             private ActorRef shardActor;
2088             private final Map<String, ActorRef> shardActors = new HashMap<>();
2089
2090             Builder(DatastoreContext.Builder datastoreContextBuilder) {
2091                 super(TestShardManager.class);
2092                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2093             }
2094
2095             Builder shardActor(ActorRef shardActor) {
2096                 this.shardActor = shardActor;
2097                 return this;
2098             }
2099
2100             Builder addShardActor(String shardName, ActorRef actorRef){
2101                 shardActors.put(shardName, actorRef);
2102                 return this;
2103             }
2104         }
2105
2106         @Override
2107         public void saveSnapshot(Object obj) {
2108             snapshot = (ShardManagerSnapshot) obj;
2109             snapshotPersist.countDown();
2110             super.saveSnapshot(obj);
2111         }
2112
2113         void verifySnapshotPersisted(Set<String> shardList) {
2114             assertEquals("saveSnapshot invoked", true,
2115                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2116             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2117         }
2118
2119         @Override
2120         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2121             if(shardActors.get(info.getShardName()) != null){
2122                 return shardActors.get(info.getShardName());
2123             }
2124
2125             if(shardActor != null) {
2126                 return shardActor;
2127             }
2128
2129             return super.newShardActor(schemaContext, info);
2130         }
2131     }
2132
2133     private static abstract class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2134                                                      extends AbstractShardManagerCreator<T> {
2135         private final Class<C> shardManagerClass;
2136
2137         AbstractGenericCreator(Class<C> shardManagerClass) {
2138             this.shardManagerClass = shardManagerClass;
2139             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
2140                     waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2141         }
2142
2143         @Override
2144         public Props props() {
2145             verify();
2146             return Props.create(shardManagerClass, this);
2147         }
2148     }
2149
2150     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2151         GenericCreator(Class<C> shardManagerClass) {
2152             super(shardManagerClass);
2153         }
2154     }
2155
2156     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2157         private static final long serialVersionUID = 1L;
2158         private final Creator<ShardManager> delegate;
2159
2160         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2161             this.delegate = delegate;
2162         }
2163
2164         @Override
2165         public ShardManager create() throws Exception {
2166             return delegate.create();
2167         }
2168     }
2169
2170     interface MessageInterceptor extends Function<Object, Object> {
2171         boolean canIntercept(Object message);
2172     }
2173
2174     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2175         return new MessageInterceptor(){
2176             @Override
2177             public Object apply(Object message) {
2178                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2179             }
2180
2181             @Override
2182             public boolean canIntercept(Object message) {
2183                 return message instanceof FindPrimary;
2184             }
2185         };
2186     }
2187
2188     private static class MockRespondActor extends MessageCollectorActor {
2189         static final String CLEAR_RESPONSE = "clear-response";
2190         static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
2191
2192         private volatile Object responseMsg;
2193
2194         @SuppressWarnings("unused")
2195         public MockRespondActor() {
2196         }
2197
2198         @SuppressWarnings("unused")
2199         public MockRespondActor(Object responseMsg) {
2200             this.responseMsg = responseMsg;
2201         }
2202
2203         public void updateResponse(Object response) {
2204             responseMsg = response;
2205         }
2206
2207         @Override
2208         public void onReceive(Object message) throws Exception {
2209             if(!"get-all-messages".equals(message)) {
2210                 LOG.debug("Received message : {}", message);
2211             }
2212             super.onReceive(message);
2213             if (message instanceof AddServer && responseMsg != null) {
2214                 getSender().tell(responseMsg, getSelf());
2215             } else if(message instanceof RemoveServer && responseMsg != null){
2216                 getSender().tell(responseMsg, getSelf());
2217             } else if(message.equals(CLEAR_RESPONSE)) {
2218                 responseMsg = null;
2219             }
2220         }
2221     }
2222 }