Implement cluster admin RPCs to change member voting states
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import akka.actor.AddressFromURIString;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.cluster.Member;
31 import akka.dispatch.Dispatchers;
32 import akka.japi.Creator;
33 import akka.pattern.Patterns;
34 import akka.persistence.RecoveryCompleted;
35 import akka.serialization.Serialization;
36 import akka.testkit.JavaTestKit;
37 import akka.testkit.TestActorRef;
38 import akka.util.Timeout;
39 import com.google.common.base.Function;
40 import com.google.common.base.Stopwatch;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.typesafe.config.ConfigFactory;
47 import java.net.URI;
48 import java.util.AbstractMap;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Map.Entry;
57 import java.util.Set;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import java.util.stream.Collectors;
62 import org.apache.commons.lang3.SerializationUtils;
63 import org.junit.After;
64 import org.junit.Before;
65 import org.junit.Test;
66 import org.mockito.Mock;
67 import org.mockito.Mockito;
68 import org.mockito.MockitoAnnotations;
69 import org.opendaylight.controller.cluster.access.concepts.MemberName;
70 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
71 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
72 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
73 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
74 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
75 import org.opendaylight.controller.cluster.datastore.Shard;
76 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
77 import org.opendaylight.controller.cluster.datastore.config.Configuration;
78 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
79 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
80 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
81 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
82 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
83 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
84 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
85 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
86 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
87 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
88 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
89 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
90 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
91 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
92 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
93 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
94 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
95 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
96 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
97 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
98 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
99 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
100 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
101 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
102 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
103 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
104 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
105 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
106 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
107 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
108 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
109 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
110 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
111 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
112 import org.opendaylight.controller.cluster.raft.RaftState;
113 import org.opendaylight.controller.cluster.raft.TestActorFactory;
114 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
115 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
116 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
117 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
118 import org.opendaylight.controller.cluster.raft.messages.AddServer;
119 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
120 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
121 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
122 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
123 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
124 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
125 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
126 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
127 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
128 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
129 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
130 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
131 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
132 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
133 import org.slf4j.Logger;
134 import org.slf4j.LoggerFactory;
135 import scala.concurrent.Await;
136 import scala.concurrent.Future;
137 import scala.concurrent.duration.FiniteDuration;
138
139 public class ShardManagerTest extends AbstractActorTest {
140     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
141     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
142     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
143     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
144
145     private static int ID_COUNTER = 1;
146
147     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
148     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
149
150     @Mock
151     private static CountDownLatch ready;
152
153     private static TestActorRef<MessageCollectorActor> mockShardActor;
154
155     private static ShardIdentifier mockShardName;
156
157     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
158             dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
159                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
160
161     private final Collection<ActorSystem> actorSystems = new ArrayList<>();
162
163     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
164
165     @Before
166     public void setUp() {
167         MockitoAnnotations.initMocks(this);
168
169         InMemoryJournal.clear();
170         InMemorySnapshotStore.clear();
171
172         if(mockShardActor == null) {
173             mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
174             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
175                     mockShardName.toString());
176         }
177
178         mockShardActor.underlyingActor().clear();
179     }
180
181     @After
182     public void tearDown() {
183         InMemoryJournal.clear();
184         InMemorySnapshotStore.clear();
185
186         for(ActorSystem system: actorSystems) {
187             JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
188         }
189
190         actorFactory.close();
191     }
192
193     private ActorSystem newActorSystem(String config) {
194         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
195         actorSystems.add(system);
196         return system;
197     }
198
199     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
200         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
201         if(system == getSystem()) {
202             return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
203         }
204
205         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
206     }
207
208     private Props newShardMgrProps() {
209         return newShardMgrProps(new MockConfiguration());
210     }
211
212     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
213         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
214         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
215         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
216         return mockFactory;
217     }
218
219     private TestShardManager.Builder newTestShardMgrBuilder() {
220         return TestShardManager.builder(datastoreContextBuilder);
221     }
222
223     private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
224         return TestShardManager.builder(datastoreContextBuilder).configuration(config);
225     }
226
227     private Props newShardMgrProps(Configuration config) {
228         return newTestShardMgrBuilder(config).props();
229     }
230
231     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
232         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
233     }
234
235     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
236         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
237     }
238
239
240     private Props newPropsShardMgrWithMockShardActor() {
241         return newTestShardMgrBuilderWithMockShardActor().props();
242     }
243
244     private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
245         return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
246     }
247
248
249     private TestShardManager newTestShardManager() {
250         return newTestShardManager(newShardMgrProps());
251     }
252
253     private TestShardManager newTestShardManager(Props props) {
254         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
255         TestShardManager shardManager = shardManagerActor.underlyingActor();
256         shardManager.waitForRecoveryComplete();
257         return shardManager;
258     }
259
260     private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
261         AssertionError last = null;
262         Stopwatch sw = Stopwatch.createStarted();
263         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
264             try {
265                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
266                 kit.expectMsgClass(LocalShardFound.class);
267                 return;
268             } catch(AssertionError e) {
269                 last = e;
270             }
271
272             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
273         }
274
275         throw last;
276     }
277
278     private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
279         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
280         if(reply instanceof Failure) {
281             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
282         }
283
284         return (T)reply;
285     }
286
287     @Test
288     public void testPerShardDatastoreContext() throws Exception {
289         LOG.info("testPerShardDatastoreContext starting");
290         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
291                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
292
293         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
294                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
295
296         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
297                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
298
299         final MockConfiguration mockConfig = new MockConfiguration() {
300             @Override
301             public Collection<String> getMemberShardNames(MemberName memberName) {
302                 return Arrays.asList("default", "topology");
303             }
304
305             @Override
306             public Collection<MemberName> getMembersFromShardName(String shardName) {
307                 return members("member-1");
308             }
309         };
310
311         final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
312                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
313         final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
314                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
315
316         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
317                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
318         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
319         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
320
321         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
322         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
323         class LocalShardManager extends ShardManager {
324             public LocalShardManager(AbstractShardManagerCreator<?> creator) {
325                 super(creator);
326             }
327
328             @Override
329             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
330                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
331                 ActorRef ref = null;
332                 if(entry != null) {
333                     ref = entry.getKey();
334                     entry.setValue(info.getDatastoreContext());
335                 }
336
337                 newShardActorLatch.countDown();
338                 return ref;
339             }
340         }
341
342         final Creator<ShardManager> creator = new Creator<ShardManager>() {
343             private static final long serialVersionUID = 1L;
344             @Override
345             public ShardManager create() throws Exception {
346                 return new LocalShardManager(new GenericCreator<LocalShardManager>(LocalShardManager.class).
347                         datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
348                         configuration(mockConfig));
349             }
350         };
351
352         JavaTestKit kit = new JavaTestKit(getSystem());
353
354         final ActorRef shardManager = actorFactory.createActor(Props.create(
355                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
356
357         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
358
359         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
360         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
361                 getShardElectionTimeoutFactor());
362         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
363                 getShardElectionTimeoutFactor());
364
365         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
366                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
367         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
368                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
369
370         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
371                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
372
373         shardManager.tell(newMockFactory, kit.getRef());
374
375         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
376         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
377
378         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
379         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
380
381         LOG.info("testPerShardDatastoreContext ending");
382     }
383
384     @Test
385     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
386         new JavaTestKit(getSystem()) {{
387             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
388
389             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
390
391             shardManager.tell(new FindPrimary("non-existent", false), getRef());
392
393             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
394         }};
395     }
396
397     @Test
398     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
399         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
400         new JavaTestKit(getSystem()) {{
401             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
402
403             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
404
405             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
406             shardManager.tell(new ActorInitialized(), mockShardActor);
407
408             DataTree mockDataTree = mock(DataTree.class);
409             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
410                     DataStoreVersions.CURRENT_VERSION), getRef());
411
412             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
413             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
414                     RaftState.Leader.name())), mockShardActor);
415
416             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
417
418             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
419             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
420                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
421             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
422         }};
423
424         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
425     }
426
427     @Test
428     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
429         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
430         new JavaTestKit(getSystem()) {{
431             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
432
433             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
434             shardManager.tell(new ActorInitialized(), mockShardActor);
435
436             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
437             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
438             shardManager.tell(new RoleChangeNotification(memberId1,
439                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
440             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
441
442             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
443
444             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
445         }};
446
447         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
448     }
449
450     @Test
451     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
452         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
453         new JavaTestKit(getSystem()) {{
454             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
455
456             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
457             shardManager.tell(new ActorInitialized(), mockShardActor);
458
459             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
460             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
461
462             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
463             shardManager.tell(new RoleChangeNotification(memberId1,
464                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
465             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
466             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
467
468             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
469
470             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
471             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
472                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
473             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
474         }};
475
476         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
477     }
478
479     @Test
480     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
481         new JavaTestKit(getSystem()) {{
482             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
483
484             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
485
486             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
487         }};
488     }
489
490     @Test
491     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
492         new JavaTestKit(getSystem()) {{
493             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
494
495             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
496             shardManager.tell(new ActorInitialized(), mockShardActor);
497
498             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
499
500             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
501         }};
502     }
503
504     @Test
505     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
506         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
507         new JavaTestKit(getSystem()) {{
508             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
509
510             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
511             shardManager.tell(new ActorInitialized(), mockShardActor);
512
513             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
514             shardManager.tell(new RoleChangeNotification(memberId,
515                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
516
517             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
518
519             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
520
521             DataTree mockDataTree = mock(DataTree.class);
522             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
523                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
524
525             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
526
527             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
528             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
529                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
530             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
531         }};
532
533         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
534     }
535
536     @Test
537     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
538         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
539         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
540         new JavaTestKit(getSystem()) {{
541             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
542
543             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
544
545             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
546             // delayed until we send ActorInitialized and RoleChangeNotification.
547             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
548
549             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
550
551             shardManager.tell(new ActorInitialized(), mockShardActor);
552
553             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
554
555             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
556             shardManager.tell(new RoleChangeNotification(memberId,
557                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
558
559             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
560
561             DataTree mockDataTree = mock(DataTree.class);
562             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
563                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
564
565             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
566             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
567                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
568             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
569
570             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
571         }};
572
573         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
574     }
575
576     @Test
577     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
578         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
579         new JavaTestKit(getSystem()) {{
580             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
581
582             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
583
584             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
585
586             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
587
588             shardManager.tell(new ActorInitialized(), mockShardActor);
589
590             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
591         }};
592
593         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
594     }
595
596     @Test
597     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
598         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
599         new JavaTestKit(getSystem()) {{
600             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
601
602             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
603             shardManager.tell(new ActorInitialized(), mockShardActor);
604             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
605                     null, RaftState.Candidate.name()), mockShardActor);
606
607             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
608
609             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
610         }};
611
612         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
613     }
614
615     @Test
616     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
617         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
618         new JavaTestKit(getSystem()) {{
619             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
620
621             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
622             shardManager.tell(new ActorInitialized(), mockShardActor);
623             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
624                     null, RaftState.IsolatedLeader.name()), mockShardActor);
625
626             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
627
628             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
629         }};
630
631         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
632     }
633
634     @Test
635     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
636         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
637         new JavaTestKit(getSystem()) {{
638             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
639
640             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
641             shardManager.tell(new ActorInitialized(), mockShardActor);
642
643             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
644
645             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
646         }};
647
648         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
649     }
650
651     @Test
652     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
653         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
654         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
655
656         // Create an ActorSystem ShardManager actor for member-1.
657
658         final ActorSystem system1 = newActorSystem("Member1");
659         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
660
661         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
662                 newTestShardMgrBuilderWithMockShardActor().cluster(
663                         new ClusterWrapperImpl(system1)).props().withDispatcher(
664                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
665
666         // Create an ActorSystem ShardManager actor for member-2.
667
668         final ActorSystem system2 = newActorSystem("Member2");
669
670         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
671
672         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
673
674         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
675                 put("default", Arrays.asList("member-1", "member-2")).
676                 put("astronauts", Arrays.asList("member-2")).build());
677
678         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
679                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
680                         new ClusterWrapperImpl(system2)).props().withDispatcher(
681                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
682
683         new JavaTestKit(system1) {{
684
685             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
686             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
687
688             shardManager2.tell(new ActorInitialized(), mockShardActor2);
689
690             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
691             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
692             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
693                     mock(DataTree.class), leaderVersion), mockShardActor2);
694             shardManager2.tell(new RoleChangeNotification(memberId2,
695                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
696
697             shardManager1.underlyingActor().waitForMemberUp();
698             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
699
700             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
701             String path = found.getPrimaryPath();
702             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
703             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
704
705             shardManager2.underlyingActor().verifyFindPrimary();
706
707             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
708
709             shardManager1.underlyingActor().waitForMemberRemoved();
710
711             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
712
713             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
714         }};
715
716         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
717     }
718
719     @Test
720     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
721         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
722         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
723
724         // Create an ActorSystem ShardManager actor for member-1.
725
726         final ActorSystem system1 = newActorSystem("Member1");
727         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
728
729         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
730
731         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
732                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
733                         new ClusterWrapperImpl(system1)).props().withDispatcher(
734                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
735
736         // Create an ActorSystem ShardManager actor for member-2.
737
738         final ActorSystem system2 = newActorSystem("Member2");
739
740         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
741
742         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
743
744         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
745             put("default", Arrays.asList("member-1", "member-2")).build());
746
747         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
748                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
749                         new ClusterWrapperImpl(system2)).props().withDispatcher(
750                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
751
752         new JavaTestKit(system1) {{
753
754             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
755             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
756             shardManager1.tell(new ActorInitialized(), mockShardActor1);
757             shardManager2.tell(new ActorInitialized(), mockShardActor2);
758
759             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
760             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
761             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
762                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
763             shardManager1.tell(new RoleChangeNotification(memberId1,
764                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
765             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
766                     DataStoreVersions.CURRENT_VERSION),
767                 mockShardActor2);
768             shardManager2.tell(new RoleChangeNotification(memberId2,
769                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
770             shardManager1.underlyingActor().waitForMemberUp();
771
772             shardManager1.tell(new FindPrimary("default", true), getRef());
773
774             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
775             String path = found.getPrimaryPath();
776             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
777
778             shardManager1.tell(MockClusterWrapper.
779                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
780
781             shardManager1.underlyingActor().waitForUnreachableMember();
782
783             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
784             assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
785             MessageCollectorActor.clearMessages(mockShardActor1);
786
787             shardManager1.tell(MockClusterWrapper.
788                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
789
790             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
791
792             shardManager1.tell(new FindPrimary("default", true), getRef());
793
794             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
795
796             shardManager1.tell(MockClusterWrapper.
797                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
798
799             shardManager1.underlyingActor().waitForReachableMember();
800
801             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
802             assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
803             MessageCollectorActor.clearMessages(mockShardActor1);
804
805             shardManager1.tell(new FindPrimary("default", true), getRef());
806
807             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
808             String path1 = found1.getPrimaryPath();
809             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
810
811             shardManager1.tell(MockClusterWrapper.
812                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
813
814             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
815
816             // Test FindPrimary wait succeeds after reachable member event.
817
818             shardManager1.tell(MockClusterWrapper.
819                     createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
820             shardManager1.underlyingActor().waitForUnreachableMember();
821
822             shardManager1.tell(new FindPrimary("default", true), getRef());
823
824             shardManager1.tell(MockClusterWrapper.
825                     createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
826
827             RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
828             String path2 = found2.getPrimaryPath();
829             assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
830         }};
831
832         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
833     }
834
835     @Test
836     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
837         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
838         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
839
840         // Create an ActorSystem ShardManager actor for member-1.
841
842         final ActorSystem system1 = newActorSystem("Member1");
843         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
844
845         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
846
847         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
848         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
849                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
850                         new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
851                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
852
853         // Create an ActorSystem ShardManager actor for member-2.
854
855         final ActorSystem system2 = newActorSystem("Member2");
856
857         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
858
859         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
860
861         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
862             put("default", Arrays.asList("member-1", "member-2")).build());
863
864         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
865                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
866                         new ClusterWrapperImpl(system2)).props().withDispatcher(
867                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
868
869         new JavaTestKit(system1) {{
870             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
871             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
872             shardManager1.tell(new ActorInitialized(), mockShardActor1);
873             shardManager2.tell(new ActorInitialized(), mockShardActor2);
874
875             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
876             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
877             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
878                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
879             shardManager1.tell(new RoleChangeNotification(memberId1,
880                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
881             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
882                     DataStoreVersions.CURRENT_VERSION),
883                 mockShardActor2);
884             shardManager2.tell(new RoleChangeNotification(memberId2,
885                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
886             shardManager1.underlyingActor().waitForMemberUp();
887
888             shardManager1.tell(new FindPrimary("default", true), getRef());
889
890             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
891             String path = found.getPrimaryPath();
892             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
893
894             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
895                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
896
897             shardManager1.tell(MockClusterWrapper.
898                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
899
900             shardManager1.underlyingActor().waitForUnreachableMember();
901
902             shardManager1.tell(new FindPrimary("default", true), getRef());
903
904             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
905
906             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
907
908             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
909                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
910             shardManager1.tell(new RoleChangeNotification(memberId1,
911                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
912
913             shardManager1.tell(new FindPrimary("default", true), getRef());
914
915             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
916             String path1 = found1.getPrimaryPath();
917             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
918
919         }};
920
921         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
922     }
923
924
925     @Test
926     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
927         new JavaTestKit(getSystem()) {{
928             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
929
930             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
931
932             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
933
934             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
935
936             assertEquals("getShardName", "non-existent", notFound.getShardName());
937         }};
938     }
939
940     @Test
941     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
942         new JavaTestKit(getSystem()) {{
943             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
944
945             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
946             shardManager.tell(new ActorInitialized(), mockShardActor);
947
948             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
949
950             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
951
952             assertTrue("Found path contains " + found.getPath().path().toString(),
953                     found.getPath().path().toString().contains("member-1-shard-default-config"));
954         }};
955     }
956
957     @Test
958     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
959         new JavaTestKit(getSystem()) {{
960             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
961
962             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
963
964             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
965         }};
966     }
967
968     @Test
969     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
970         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
971         new JavaTestKit(getSystem()) {{
972             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
973
974             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
975
976             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
977             // delayed until we send ActorInitialized.
978             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
979                     new Timeout(5, TimeUnit.SECONDS));
980
981             shardManager.tell(new ActorInitialized(), mockShardActor);
982
983             Object resp = Await.result(future, duration("5 seconds"));
984             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
985         }};
986
987         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
988     }
989
990     @Test
991     public void testOnRecoveryJournalIsCleaned() {
992         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
993         InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
994         InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
995         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
996
997         newTestShardManager();
998
999         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
1000
1001         // Journal entries up to the last one should've been deleted
1002         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
1003         synchronized (journal) {
1004             assertEquals("Journal size", 0, journal.size());
1005         }
1006     }
1007
1008     @Test
1009     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1010         TestShardManager shardManager = newTestShardManager();
1011
1012         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1013         shardManager.onReceiveCommand(new RoleChangeNotification(
1014                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1015
1016         verify(ready, never()).countDown();
1017
1018         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1019                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1020
1021         verify(ready, times(1)).countDown();
1022     }
1023
1024     @Test
1025     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1026         new JavaTestKit(getSystem()) {{
1027             TestShardManager shardManager = newTestShardManager();
1028
1029             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1030             shardManager.onReceiveCommand(new RoleChangeNotification(
1031                     memberId, null, RaftState.Follower.name()));
1032
1033             verify(ready, never()).countDown();
1034
1035             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1036
1037             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1038                     "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1039                     DataStoreVersions.CURRENT_VERSION));
1040
1041             verify(ready, times(1)).countDown();
1042         }};
1043     }
1044
1045     @Test
1046     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1047         new JavaTestKit(getSystem()) {{
1048             TestShardManager shardManager = newTestShardManager();
1049
1050             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1051             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1052
1053             verify(ready, never()).countDown();
1054
1055             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1056                     "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1057                     DataStoreVersions.CURRENT_VERSION));
1058
1059             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1060
1061             verify(ready, times(1)).countDown();
1062         }};
1063     }
1064
1065     @Test
1066     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1067         TestShardManager shardManager = newTestShardManager();
1068
1069         shardManager.onReceiveCommand(new RoleChangeNotification(
1070                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1071
1072         verify(ready, never()).countDown();
1073     }
1074
1075     @Test
1076     public void testByDefaultSyncStatusIsFalse() throws Exception{
1077         TestShardManager shardManager = newTestShardManager();
1078
1079         assertEquals(false, shardManager.getMBean().getSyncStatus());
1080     }
1081
1082     @Test
1083     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1084         TestShardManager shardManager = newTestShardManager();
1085
1086         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1087                 RaftState.Follower.name(), RaftState.Leader.name()));
1088
1089         assertEquals(true, shardManager.getMBean().getSyncStatus());
1090     }
1091
1092     @Test
1093     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1094         TestShardManager shardManager = newTestShardManager();
1095
1096         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1097         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1098                 RaftState.Follower.name(), RaftState.Candidate.name()));
1099
1100         assertEquals(false, shardManager.getMBean().getSyncStatus());
1101
1102         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1103         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1104                 true, shardId));
1105
1106         assertEquals(false, shardManager.getMBean().getSyncStatus());
1107     }
1108
1109     @Test
1110     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1111         TestShardManager shardManager = newTestShardManager();
1112
1113         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1114         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1115                 RaftState.Candidate.name(), RaftState.Follower.name()));
1116
1117         // Initially will be false
1118         assertEquals(false, shardManager.getMBean().getSyncStatus());
1119
1120         // Send status true will make sync status true
1121         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1122
1123         assertEquals(true, shardManager.getMBean().getSyncStatus());
1124
1125         // Send status false will make sync status false
1126         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1127
1128         assertEquals(false, shardManager.getMBean().getSyncStatus());
1129
1130     }
1131
1132     @Test
1133     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1134         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1135         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1136             @Override
1137             public List<String> getMemberShardNames(MemberName memberName) {
1138                 return Arrays.asList("default", "astronauts");
1139             }
1140         }));
1141
1142         // Initially will be false
1143         assertEquals(false, shardManager.getMBean().getSyncStatus());
1144
1145         // Make default shard leader
1146         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1147         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1148                 RaftState.Follower.name(), RaftState.Leader.name()));
1149
1150         // default = Leader, astronauts is unknown so sync status remains false
1151         assertEquals(false, shardManager.getMBean().getSyncStatus());
1152
1153         // Make astronauts shard leader as well
1154         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1155         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1156                 RaftState.Follower.name(), RaftState.Leader.name()));
1157
1158         // Now sync status should be true
1159         assertEquals(true, shardManager.getMBean().getSyncStatus());
1160
1161         // Make astronauts a Follower
1162         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1163                 RaftState.Leader.name(), RaftState.Follower.name()));
1164
1165         // Sync status is not true
1166         assertEquals(false, shardManager.getMBean().getSyncStatus());
1167
1168         // Make the astronauts follower sync status true
1169         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1170
1171         // Sync status is now true
1172         assertEquals(true, shardManager.getMBean().getSyncStatus());
1173
1174         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1175     }
1176
1177     @Test
1178     public void testOnReceiveSwitchShardBehavior() throws Exception {
1179         new JavaTestKit(getSystem()) {{
1180             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1181
1182             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1183             shardManager.tell(new ActorInitialized(), mockShardActor);
1184
1185             shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1186
1187             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1188
1189             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1190             assertEquals(1000, switchBehavior.getNewTerm());
1191         }};
1192     }
1193
1194     private static List<MemberName> members(String... names) {
1195         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1196     }
1197
1198     @Test
1199     public void testOnCreateShard() {
1200         LOG.info("testOnCreateShard starting");
1201         new JavaTestKit(getSystem()) {{
1202             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1203
1204             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1205                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1206
1207             SchemaContext schemaContext = TestModel.createTestContext();
1208             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1209
1210             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1211                     persistent(false).build();
1212             Shard.Builder shardBuilder = Shard.builder();
1213
1214             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1215                     "foo", null, members("member-1", "member-5", "member-6"));
1216             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1217
1218             expectMsgClass(duration("5 seconds"), Success.class);
1219
1220             shardManager.tell(new FindLocalShard("foo", true), getRef());
1221
1222             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1223
1224             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1225             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1226                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1227             assertEquals("peerMembers", Sets.newHashSet(
1228                 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1229                 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1230                 shardBuilder.getPeerAddresses().keySet());
1231             assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1232                     shardBuilder.getId());
1233             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1234
1235             // Send CreateShard with same name - should return Success with a message.
1236
1237             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1238
1239             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1240             assertNotNull("Success status is null", success.status());
1241         }};
1242
1243         LOG.info("testOnCreateShard ending");
1244     }
1245
1246     @Test
1247     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1248         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1249         new JavaTestKit(getSystem()) {{
1250             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1251
1252             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1253                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1254
1255             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1256
1257             Shard.Builder shardBuilder = Shard.builder();
1258             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1259                     "foo", null, members("member-5", "member-6"));
1260
1261             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1262             expectMsgClass(duration("5 seconds"), Success.class);
1263
1264             shardManager.tell(new FindLocalShard("foo", true), getRef());
1265             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1266
1267             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1268             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1269                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1270         }};
1271
1272         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1273     }
1274
1275     @Test
1276     public void testOnCreateShardWithNoInitialSchemaContext() {
1277         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1278         new JavaTestKit(getSystem()) {{
1279             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1280                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1281
1282             Shard.Builder shardBuilder = Shard.builder();
1283
1284             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1285                     "foo", null, members("member-1"));
1286             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1287
1288             expectMsgClass(duration("5 seconds"), Success.class);
1289
1290             SchemaContext schemaContext = TestModel.createTestContext();
1291             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1292
1293             shardManager.tell(new FindLocalShard("foo", true), getRef());
1294
1295             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1296
1297             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1298             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1299         }};
1300
1301         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1302     }
1303
1304     @Test
1305     public void testGetSnapshot() throws Throwable {
1306         LOG.info("testGetSnapshot starting");
1307         JavaTestKit kit = new JavaTestKit(getSystem());
1308
1309         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1310                    put("shard1", Arrays.asList("member-1")).
1311                    put("shard2", Arrays.asList("member-1")).
1312                    put("astronauts", Collections.<String>emptyList()).build());
1313
1314         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1315                 withDispatcher(Dispatchers.DefaultDispatcherId()));
1316
1317         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1318         Failure failure = kit.expectMsgClass(Failure.class);
1319         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1320
1321         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1322
1323         waitForShardInitialized(shardManager, "shard1", kit);
1324         waitForShardInitialized(shardManager, "shard2", kit);
1325
1326         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1327
1328         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1329
1330         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1331         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1332
1333         Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
1334
1335         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1336                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1337
1338         // Add a new replica
1339
1340         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1341
1342         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1343         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1344
1345         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1346         mockShardLeaderKit.expectMsgClass(AddServer.class);
1347         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1348         kit.expectMsgClass(Status.Success.class);
1349         waitForShardInitialized(shardManager, "astronauts", kit);
1350
1351         // Send another GetSnapshot and verify
1352
1353         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1354         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1355
1356         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1357                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1358
1359         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1360         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1361         ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1362         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1363                 Sets.newHashSet(snapshot.getShardList()));
1364
1365         LOG.info("testGetSnapshot ending");
1366     }
1367
1368     @Test
1369     public void testRestoreFromSnapshot() throws Throwable {
1370         LOG.info("testRestoreFromSnapshot starting");
1371
1372         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1373
1374         JavaTestKit kit = new JavaTestKit(getSystem());
1375
1376         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1377                    put("shard1", Collections.<String>emptyList()).
1378                    put("shard2", Collections.<String>emptyList()).
1379                    put("astronauts", Collections.<String>emptyList()).build());
1380
1381
1382         ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1383         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1384                 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1385         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1386                 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1387
1388         shardManager.underlyingActor().waitForRecoveryComplete();
1389
1390         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1391
1392         waitForShardInitialized(shardManager, "shard1", kit);
1393         waitForShardInitialized(shardManager, "shard2", kit);
1394         waitForShardInitialized(shardManager, "astronauts", kit);
1395
1396         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1397
1398         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1399
1400         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1401
1402         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1403         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1404         snapshot = SerializationUtils.deserialize(snapshotBytes);
1405         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1406                 Sets.newHashSet(snapshot.getShardList()));
1407
1408         LOG.info("testRestoreFromSnapshot ending");
1409     }
1410
1411     @Test
1412     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1413         new JavaTestKit(getSystem()) {{
1414             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1415                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1416
1417             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1418             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1419
1420             assertEquals("Failure obtained", true,
1421                           (resp.cause() instanceof IllegalArgumentException));
1422         }};
1423     }
1424
1425     @Test
1426     public void testAddShardReplica() throws Exception {
1427         LOG.info("testAddShardReplica starting");
1428         MockConfiguration mockConfig =
1429                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1430                    put("default", Arrays.asList("member-1", "member-2")).
1431                    put("astronauts", Arrays.asList("member-2")).build());
1432
1433         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1434         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1435
1436         // Create an ActorSystem ShardManager actor for member-1.
1437         final ActorSystem system1 = newActorSystem("Member1");
1438         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1439         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1440         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1441                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1442                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1443
1444         // Create an ActorSystem ShardManager actor for member-2.
1445         final ActorSystem system2 = newActorSystem("Member2");
1446         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1447
1448         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1449         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1450         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1451                 TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
1452                         new AddServerReply(ServerChangeStatus.OK, memberId2)).
1453                         withDispatcher(Dispatchers.DefaultDispatcherId()), name);
1454         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1455                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1456                         new ClusterWrapperImpl(system2)).props().
1457                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1458
1459         new JavaTestKit(system1) {{
1460
1461             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1462             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1463
1464             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1465
1466             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1467             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1468                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1469             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1470                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1471
1472             newReplicaShardManager.underlyingActor().waitForMemberUp();
1473             leaderShardManager.underlyingActor().waitForMemberUp();
1474
1475             //Have a dummy snapshot to be overwritten by the new data persisted.
1476             String[] restoredShards = {"default", "people"};
1477             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1478             InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1479             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1480
1481             InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1482             InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1483
1484             //construct a mock response message
1485             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1486             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1487                 AddServer.class);
1488             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1489             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1490             expectMsgClass(duration("5 seconds"), Status.Success.class);
1491
1492             InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1493             InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1494             List<ShardManagerSnapshot> persistedSnapshots =
1495                 InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
1496             assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1497             ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1498             assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1499                     Sets.newHashSet(shardManagerSnapshot.getShardList()));
1500         }};
1501         LOG.info("testAddShardReplica ending");
1502     }
1503
1504     @Test
1505     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1506         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1507         new JavaTestKit(getSystem()) {{
1508             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1509                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1510
1511             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1512             shardManager.tell(new ActorInitialized(), mockShardActor);
1513
1514             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1515             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1516             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1517                     Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1518
1519             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1520
1521             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1522             shardManager.tell(new RoleChangeNotification(newReplicaId,
1523                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1524             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId,
1525                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1526
1527             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1528
1529             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1530
1531             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1532             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1533
1534             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1535             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1536
1537             // Send message again to verify previous in progress state is cleared
1538
1539             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1540             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1541             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1542
1543             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1544
1545             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1546                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1547             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1548             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1549             expectMsgClass(duration("5 seconds"), Failure.class);
1550
1551             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1552             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1553         }};
1554
1555         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1556     }
1557
1558     @Test
1559     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1560         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1561         new JavaTestKit(getSystem()) {{
1562             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1563             ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1564
1565             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1566             shardManager.tell(new ActorInitialized(), mockShardActor);
1567             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1568                     DataStoreVersions.CURRENT_VERSION), getRef());
1569             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1570                     RaftState.Leader.name())), mockShardActor);
1571
1572             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1573             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1574             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1575
1576             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1577             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1578         }};
1579
1580         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1581     }
1582
1583     @Test
1584     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1585         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1586         new JavaTestKit(getSystem()) {{
1587             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1588
1589             MockConfiguration mockConfig =
1590                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1591                        put("astronauts", Arrays.asList("member-2")).build());
1592
1593             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1594             final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1595                     newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1596             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1597
1598             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1599
1600             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1601             terminateWatcher.watch(mockNewReplicaShardActor);
1602
1603             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1604
1605             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1606             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1607                     addServerMsg.getNewServerId());
1608             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1609
1610             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1611             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1612
1613             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1614             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1615
1616             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1617
1618             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1619             mockShardLeaderKit.expectMsgClass(AddServer.class);
1620             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1621             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1622             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1623         }};
1624
1625         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1626     }
1627
1628     @Test
1629     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1630         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1631                 AddServer.class, new AddShardReplica("astronauts"));
1632     }
1633
1634     @Test
1635     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1636         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1637         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1638         new JavaTestKit(getSystem()) {{
1639             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1640                        put("astronauts", Arrays.asList("member-2")).build());
1641
1642             final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1643                     shardActor(mockShardActor).props(), shardMgrID);
1644
1645             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1646             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1647                     AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
1648
1649             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1650             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1651             assertEquals("Failure obtained", true,
1652                           (resp.cause() instanceof RuntimeException));
1653         }};
1654
1655         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1656     }
1657
1658     @Test
1659     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1660         new JavaTestKit(getSystem()) {{
1661             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1662                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1663
1664             shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1665             Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1666             assertEquals("Failure obtained", true,
1667                          (resp.cause() instanceof PrimaryNotFoundException));
1668         }};
1669     }
1670
1671     @Test
1672     /**
1673      * Primary is Local
1674      */
1675     public void testRemoveShardReplicaLocal() throws Exception {
1676         new JavaTestKit(getSystem()) {{
1677             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1678
1679             final TestActorRef<MockRespondActor> respondActor =
1680                     actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
1681                             new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1682
1683             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1684
1685             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1686             shardManager.tell(new ActorInitialized(), respondActor);
1687             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1688                     DataStoreVersions.CURRENT_VERSION), getRef());
1689             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1690                     RaftState.Leader.name())), respondActor);
1691
1692             shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1693             final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
1694             assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1695                     removeServer.getServerId());
1696             expectMsgClass(duration("5 seconds"), Success.class);
1697         }};
1698     }
1699
1700     @Test
1701     public void testRemoveShardReplicaRemote() throws Exception {
1702         MockConfiguration mockConfig =
1703                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1704                         put("default", Arrays.asList("member-1", "member-2")).
1705                         put("astronauts", Arrays.asList("member-1")).build());
1706
1707         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1708
1709         // Create an ActorSystem ShardManager actor for member-1.
1710         final ActorSystem system1 = newActorSystem("Member1");
1711         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1712         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1713
1714         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1715                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1716                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1717                 shardManagerID);
1718
1719         // Create an ActorSystem ShardManager actor for member-2.
1720         final ActorSystem system2 = newActorSystem("Member2");
1721         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1722
1723         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1724         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1725         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1726                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1727                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1728
1729         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1730
1731         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1732                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1733                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1734                 shardManagerID);
1735
1736         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1737         //    akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1738         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1739         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1740         // look like so,
1741         //    akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1742         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1743         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1744         // dead letters.
1745         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1746         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1747         // thing works as expected
1748         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1749                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix);
1750
1751         LOG.error("Forwarding actor : {}", actorRef);
1752
1753         new JavaTestKit(system1) {{
1754
1755             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1756             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1757
1758             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1759             newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1760
1761             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1762             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1763                     mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1764             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1765                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1766
1767             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1768             newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
1769                     mock(DataTree.class), leaderVersion), mockShardActor);
1770             newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
1771                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1772
1773             newReplicaShardManager.underlyingActor().waitForMemberUp();
1774             leaderShardManager.underlyingActor().waitForMemberUp();
1775
1776             //construct a mock response message
1777             newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1778             RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1779                     RemoveServer.class);
1780             String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1781             assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1782             expectMsgClass(duration("5 seconds"), Status.Success.class);
1783         }};
1784
1785     }
1786
1787     @Test
1788     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1789         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1790                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1791     }
1792
1793     @Test
1794     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1795         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1796                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1797     }
1798
1799
1800     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1801                                                       final Class<?> firstForwardedServerChangeClass,
1802                                                       final Object secondServerChange) throws Exception {
1803         new JavaTestKit(getSystem()) {{
1804             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1805             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1806
1807             MockConfiguration mockConfig =
1808                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1809                             put(shardName, Arrays.asList("member-2")).build());
1810
1811             final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1812                     newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
1813                             new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1814                     shardMgrID);
1815
1816             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1817
1818             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1819
1820             shardManager.tell(firstServerChange, getRef());
1821
1822             mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1823
1824             shardManager.tell(secondServerChange, secondRequestKit.getRef());
1825
1826             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1827         }};
1828     }
1829
1830     @Test
1831     public void testServerRemovedShardActorNotRunning() throws Exception {
1832         LOG.info("testServerRemovedShardActorNotRunning starting");
1833         new JavaTestKit(getSystem()) {{
1834             MockConfiguration mockConfig =
1835                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1836                             put("default", Arrays.asList("member-1", "member-2")).
1837                             put("astronauts", Arrays.asList("member-2")).
1838                             put("people", Arrays.asList("member-1", "member-2")).build());
1839
1840             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1841
1842             shardManager.underlyingActor().waitForRecoveryComplete();
1843             shardManager.tell(new FindLocalShard("people", false), getRef());
1844             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1845
1846             shardManager.tell(new FindLocalShard("default", false), getRef());
1847             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1848
1849             // Removed the default shard replica from member-1
1850             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1851             ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build();
1852             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1853
1854             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1855         }};
1856
1857         LOG.info("testServerRemovedShardActorNotRunning ending");
1858     }
1859
1860     @Test
1861     public void testServerRemovedShardActorRunning() throws Exception {
1862         LOG.info("testServerRemovedShardActorRunning starting");
1863         new JavaTestKit(getSystem()) {{
1864             MockConfiguration mockConfig =
1865                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1866                             put("default", Arrays.asList("member-1", "member-2")).
1867                             put("astronauts", Arrays.asList("member-2")).
1868                             put("people", Arrays.asList("member-1", "member-2")).build());
1869
1870             String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1871             TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1872                     MessageCollectorActor.props(), shardId);
1873
1874             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1875                     newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1876
1877             shardManager.underlyingActor().waitForRecoveryComplete();
1878
1879             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1880             shardManager.tell(new ActorInitialized(), shard);
1881
1882             waitForShardInitialized(shardManager, "people", this);
1883             waitForShardInitialized(shardManager, "default", this);
1884
1885             // Removed the default shard replica from member-1
1886             shardManager.tell(new ServerRemoved(shardId), getRef());
1887
1888             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1889
1890             MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1891         }};
1892
1893         LOG.info("testServerRemovedShardActorRunning ending");
1894     }
1895
1896
1897     @Test
1898     public void testShardPersistenceWithRestoredData() throws Exception {
1899         LOG.info("testShardPersistenceWithRestoredData starting");
1900         new JavaTestKit(getSystem()) {{
1901             MockConfiguration mockConfig =
1902                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1903                    put("default", Arrays.asList("member-1", "member-2")).
1904                    put("astronauts", Arrays.asList("member-2")).
1905                    put("people", Arrays.asList("member-1", "member-2")).build());
1906             String[] restoredShards = {"default", "astronauts"};
1907             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1908             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1909
1910             //create shardManager to come up with restored data
1911             TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1912                     newShardMgrProps(mockConfig));
1913
1914             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1915
1916             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1917             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1918             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1919
1920             //Verify a local shard is created for the restored shards,
1921             //although we expect a NotInitializedException for the shards as the actor initialization
1922             //message is not sent for them
1923             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1924             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1925
1926             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1927             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1928         }};
1929
1930         LOG.info("testShardPersistenceWithRestoredData ending");
1931     }
1932
1933     @Test
1934     public void testShutDown() throws Exception {
1935         LOG.info("testShutDown starting");
1936         new JavaTestKit(getSystem()) {{
1937             MockConfiguration mockConfig =
1938                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1939                             put("shard1", Arrays.asList("member-1")).
1940                             put("shard2", Arrays.asList("member-1")).build());
1941
1942             String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1943             TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
1944                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
1945
1946             String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1947             TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
1948                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
1949
1950             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
1951                     mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
1952                         withDispatcher(Dispatchers.DefaultDispatcherId()));
1953
1954             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1955             shardManager.tell(new ActorInitialized(), shard1);
1956             shardManager.tell(new ActorInitialized(), shard2);
1957
1958             FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1959             Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1960
1961             MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1962             MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1963
1964             try {
1965                 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1966                 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1967             } catch(TimeoutException e) {
1968                 // expected
1969             }
1970
1971             actorFactory.killActor(shard1, this);
1972             actorFactory.killActor(shard2, this);
1973
1974             Boolean stopped = Await.result(stopFuture, duration);
1975             assertEquals("Stopped", Boolean.TRUE, stopped);
1976         }};
1977
1978         LOG.info("testShutDown ending");
1979     }
1980
1981     @Test
1982     public void testChangeServersVotingStatus() throws Exception {
1983         new JavaTestKit(getSystem()) {{
1984             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1985
1986             TestActorRef<MockRespondActor> respondActor =
1987                     actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
1988                             new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
1989
1990             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1991
1992             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1993             shardManager.tell(new ActorInitialized(), respondActor);
1994             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1995                     DataStoreVersions.CURRENT_VERSION), getRef());
1996             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1997                     RaftState.Leader.name())), respondActor);
1998
1999             shardManager.tell(new ChangeShardMembersVotingStatus("default",
2000                     ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
2001
2002             ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
2003                     respondActor, ChangeServersVotingStatus.class);
2004             assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2005                     ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"),
2006                             shardMrgIDSuffix).toString(), Boolean.TRUE));
2007
2008             expectMsgClass(duration("5 seconds"), Success.class);
2009         }};
2010     }
2011
2012     @Test
2013     public void testChangeServersVotingStatusWithNoLeader() throws Exception {
2014         new JavaTestKit(getSystem()) {{
2015             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2016
2017             TestActorRef<MockRespondActor> respondActor =
2018                     actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2019                             new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2020
2021             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2022
2023             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2024             shardManager.tell(new ActorInitialized(), respondActor);
2025             shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
2026
2027             shardManager.tell(new ChangeShardMembersVotingStatus("default",
2028                     ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
2029
2030             MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2031
2032             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
2033             assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
2034         }};
2035     }
2036
2037     private static class TestShardManager extends ShardManager {
2038         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2039         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2040         private ShardManagerSnapshot snapshot;
2041         private final Map<String, ActorRef> shardActors;
2042         private final ActorRef shardActor;
2043         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2044         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2045         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2046         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2047         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2048         private volatile MessageInterceptor messageInterceptor;
2049
2050         private TestShardManager(Builder builder) {
2051             super(builder);
2052             shardActor = builder.shardActor;
2053             shardActors = builder.shardActors;
2054         }
2055
2056         @Override
2057         protected void handleRecover(Object message) throws Exception {
2058             try {
2059                 super.handleRecover(message);
2060             } finally {
2061                 if(message instanceof RecoveryCompleted) {
2062                     recoveryComplete.countDown();
2063                 }
2064             }
2065         }
2066
2067         private void countDownIfOther(final Member member, CountDownLatch latch) {
2068             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2069                 latch.countDown();
2070             }
2071         }
2072
2073         @Override
2074         public void handleCommand(Object message) throws Exception {
2075             try{
2076                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2077                     getSender().tell(messageInterceptor.apply(message), getSelf());
2078                 } else {
2079                     super.handleCommand(message);
2080                 }
2081             } finally {
2082                 if(message instanceof FindPrimary) {
2083                     findPrimaryMessageReceived.countDown();
2084                 } else if(message instanceof ClusterEvent.MemberUp) {
2085                     countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived);
2086                 } else if(message instanceof ClusterEvent.MemberRemoved) {
2087                     countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived);
2088                 } else if(message instanceof ClusterEvent.UnreachableMember) {
2089                     countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived);
2090                 } else if(message instanceof ClusterEvent.ReachableMember) {
2091                     countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived);
2092                 }
2093             }
2094         }
2095
2096         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2097             this.messageInterceptor = messageInterceptor;
2098         }
2099
2100         void waitForRecoveryComplete() {
2101             assertEquals("Recovery complete", true,
2102                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2103         }
2104
2105         void waitForMemberUp() {
2106             assertEquals("MemberUp received", true,
2107                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2108             memberUpReceived = new CountDownLatch(1);
2109         }
2110
2111         void waitForMemberRemoved() {
2112             assertEquals("MemberRemoved received", true,
2113                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2114             memberRemovedReceived = new CountDownLatch(1);
2115         }
2116
2117         void waitForUnreachableMember() {
2118             assertEquals("UnreachableMember received", true,
2119                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2120                 ));
2121             memberUnreachableReceived = new CountDownLatch(1);
2122         }
2123
2124         void waitForReachableMember() {
2125             assertEquals("ReachableMember received", true,
2126                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2127             memberReachableReceived = new CountDownLatch(1);
2128         }
2129
2130         void verifyFindPrimary() {
2131             assertEquals("FindPrimary received", true,
2132                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2133             findPrimaryMessageReceived = new CountDownLatch(1);
2134         }
2135
2136         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2137             return new Builder(datastoreContextBuilder);
2138         }
2139
2140         private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2141             private ActorRef shardActor;
2142             private final Map<String, ActorRef> shardActors = new HashMap<>();
2143
2144             Builder(DatastoreContext.Builder datastoreContextBuilder) {
2145                 super(TestShardManager.class);
2146                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2147             }
2148
2149             Builder shardActor(ActorRef shardActor) {
2150                 this.shardActor = shardActor;
2151                 return this;
2152             }
2153
2154             Builder addShardActor(String shardName, ActorRef actorRef){
2155                 shardActors.put(shardName, actorRef);
2156                 return this;
2157             }
2158         }
2159
2160         @Override
2161         public void saveSnapshot(Object obj) {
2162             snapshot = (ShardManagerSnapshot) obj;
2163             snapshotPersist.countDown();
2164             super.saveSnapshot(obj);
2165         }
2166
2167         void verifySnapshotPersisted(Set<String> shardList) {
2168             assertEquals("saveSnapshot invoked", true,
2169                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2170             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2171         }
2172
2173         @Override
2174         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2175             if(shardActors.get(info.getShardName()) != null){
2176                 return shardActors.get(info.getShardName());
2177             }
2178
2179             if(shardActor != null) {
2180                 return shardActor;
2181             }
2182
2183             return super.newShardActor(schemaContext, info);
2184         }
2185     }
2186
2187     private static abstract class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2188                                                      extends AbstractShardManagerCreator<T> {
2189         private final Class<C> shardManagerClass;
2190
2191         AbstractGenericCreator(Class<C> shardManagerClass) {
2192             this.shardManagerClass = shardManagerClass;
2193             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
2194                     waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2195         }
2196
2197         @Override
2198         public Props props() {
2199             verify();
2200             return Props.create(shardManagerClass, this);
2201         }
2202     }
2203
2204     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2205         GenericCreator(Class<C> shardManagerClass) {
2206             super(shardManagerClass);
2207         }
2208     }
2209
2210     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2211         private static final long serialVersionUID = 1L;
2212         private final Creator<ShardManager> delegate;
2213
2214         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2215             this.delegate = delegate;
2216         }
2217
2218         @Override
2219         public ShardManager create() throws Exception {
2220             return delegate.create();
2221         }
2222     }
2223
2224     interface MessageInterceptor extends Function<Object, Object> {
2225         boolean canIntercept(Object message);
2226     }
2227
2228     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2229         return new MessageInterceptor(){
2230             @Override
2231             public Object apply(Object message) {
2232                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2233             }
2234
2235             @Override
2236             public boolean canIntercept(Object message) {
2237                 return message instanceof FindPrimary;
2238             }
2239         };
2240     }
2241
2242     private static class MockRespondActor extends MessageCollectorActor {
2243         static final String CLEAR_RESPONSE = "clear-response";
2244
2245         private Object responseMsg;
2246         private final Class<?> requestClass;
2247
2248         @SuppressWarnings("unused")
2249         public MockRespondActor(Class<?> requestClass, Object responseMsg) {
2250             this.requestClass = requestClass;
2251             this.responseMsg = responseMsg;
2252         }
2253
2254         @Override
2255         public void onReceive(Object message) throws Exception {
2256             if(message.equals(CLEAR_RESPONSE)) {
2257                 responseMsg = null;
2258             } else {
2259                 super.onReceive(message);
2260                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2261                     getSender().tell(responseMsg, getSelf());
2262                 }
2263             }
2264         }
2265     }
2266 }