sal-distributed-datastore: use lambdas
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.actor.AddressFromURIString;
25 import akka.actor.Props;
26 import akka.actor.Status;
27 import akka.actor.Status.Failure;
28 import akka.actor.Status.Success;
29 import akka.cluster.Cluster;
30 import akka.cluster.ClusterEvent;
31 import akka.cluster.Member;
32 import akka.dispatch.Dispatchers;
33 import akka.japi.Creator;
34 import akka.pattern.Patterns;
35 import akka.persistence.RecoveryCompleted;
36 import akka.serialization.Serialization;
37 import akka.testkit.JavaTestKit;
38 import akka.testkit.TestActorRef;
39 import akka.util.Timeout;
40 import com.google.common.base.Function;
41 import com.google.common.base.Stopwatch;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import java.net.URI;
47 import java.util.AbstractMap;
48 import java.util.Arrays;
49 import java.util.Collection;
50 import java.util.Collections;
51 import java.util.HashMap;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Map.Entry;
55 import java.util.Set;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import java.util.stream.Collectors;
60 import org.junit.Test;
61 import org.mockito.Mockito;
62 import org.opendaylight.controller.cluster.access.concepts.MemberName;
63 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
64 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
65 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
66 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
67 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
68 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
69 import org.opendaylight.controller.cluster.datastore.Shard;
70 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
71 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
72 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
73 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
74 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
75 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
76 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
77 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
78 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
79 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
80 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
81 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
82 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
83 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
84 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
85 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
86 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
87 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
88 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
89 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
90 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
91 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
93 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
94 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
95 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
96 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
97 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
98 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
99 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
100 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
101 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
102 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
103 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
104 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
105 import org.opendaylight.controller.cluster.raft.RaftState;
106 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
107 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
108 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
109 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
110 import org.opendaylight.controller.cluster.raft.messages.AddServer;
111 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
112 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
113 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
114 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
115 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
116 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
117 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
118 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
119 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
120 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
121 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
123 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
124 import org.slf4j.Logger;
125 import org.slf4j.LoggerFactory;
126 import scala.concurrent.Await;
127 import scala.concurrent.Future;
128 import scala.concurrent.duration.FiniteDuration;
129
130 public class ShardManagerTest extends AbstractShardManagerTest {
131     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
132     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
133     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
134
135     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
136
137     private ActorSystem newActorSystem(String config) {
138         return newActorSystem("cluster-test", config);
139     }
140
141     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
142         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
143         if (system == getSystem()) {
144             return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
145         }
146
147         return system.actorOf(Props.create(MessageCollectorActor.class), name);
148     }
149
150     private Props newShardMgrProps() {
151         return newShardMgrProps(new MockConfiguration());
152     }
153
154     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
155         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
156         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
157         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
158         return mockFactory;
159     }
160
161     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
162         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
163     }
164
165     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
166         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
167                 .distributedDataStore(mock(DistributedDataStore.class));
168     }
169
170
171     private Props newPropsShardMgrWithMockShardActor() {
172         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
173                 Dispatchers.DefaultDispatcherId());
174     }
175
176     private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
177         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
178                 .withDispatcher(Dispatchers.DefaultDispatcherId());
179     }
180
181
182     private TestShardManager newTestShardManager() {
183         return newTestShardManager(newShardMgrProps());
184     }
185
186     private TestShardManager newTestShardManager(Props props) {
187         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
188         TestShardManager shardManager = shardManagerActor.underlyingActor();
189         shardManager.waitForRecoveryComplete();
190         return shardManager;
191     }
192
193     private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
194         AssertionError last = null;
195         Stopwatch sw = Stopwatch.createStarted();
196         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
197             try {
198                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
199                 kit.expectMsgClass(LocalShardFound.class);
200                 return;
201             } catch (AssertionError e) {
202                 last = e;
203             }
204
205             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
206         }
207
208         throw last;
209     }
210
211     @SuppressWarnings("unchecked")
212     private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
213         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
214         if (reply instanceof Failure) {
215             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
216         }
217
218         return (T)reply;
219     }
220
221     @Test
222     public void testPerShardDatastoreContext() throws Exception {
223         LOG.info("testPerShardDatastoreContext starting");
224         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
225                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
226
227         Mockito.doReturn(
228                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
229                 .when(mockFactory).getShardDatastoreContext("default");
230
231         Mockito.doReturn(
232                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
233                 .when(mockFactory).getShardDatastoreContext("topology");
234
235         final MockConfiguration mockConfig = new MockConfiguration() {
236             @Override
237             public Collection<String> getMemberShardNames(MemberName memberName) {
238                 return Arrays.asList("default", "topology");
239             }
240
241             @Override
242             public Collection<MemberName> getMembersFromShardName(String shardName) {
243                 return members("member-1");
244             }
245         };
246
247         final ActorRef defaultShardActor = actorFactory.createActor(
248                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
249         final ActorRef topologyShardActor = actorFactory.createActor(
250                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
251
252         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
253                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
254         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
255         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
256
257         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
258         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
259         class LocalShardManager extends ShardManager {
260             LocalShardManager(AbstractShardManagerCreator<?> creator) {
261                 super(creator);
262             }
263
264             @Override
265             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
266                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
267                 ActorRef ref = null;
268                 if (entry != null) {
269                     ref = entry.getKey();
270                     entry.setValue(info.getDatastoreContext());
271                 }
272
273                 newShardActorLatch.countDown();
274                 return ref;
275             }
276         }
277
278         final Creator<ShardManager> creator = new Creator<ShardManager>() {
279             private static final long serialVersionUID = 1L;
280             @Override
281             public ShardManager create() throws Exception {
282                 return new LocalShardManager(
283                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
284                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
285             }
286         };
287
288         JavaTestKit kit = new JavaTestKit(getSystem());
289
290         final ActorRef shardManager = actorFactory.createActor(Props.create(
291                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
292
293         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
294
295         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
296         assertEquals("getShardElectionTimeoutFactor", 6,
297                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
298         assertEquals("getShardElectionTimeoutFactor", 7,
299                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
300
301         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
302                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
303         Mockito.doReturn(
304                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
305                 .when(newMockFactory).getShardDatastoreContext("default");
306
307         Mockito.doReturn(
308                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
309                 .when(newMockFactory).getShardDatastoreContext("topology");
310
311         shardManager.tell(newMockFactory, kit.getRef());
312
313         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
314                 DatastoreContext.class);
315         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
316
317         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
318         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
319
320         LOG.info("testPerShardDatastoreContext ending");
321     }
322
323     @Test
324     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
325         new JavaTestKit(getSystem()) {
326             {
327                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
328
329                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
330
331                 shardManager.tell(new FindPrimary("non-existent", false), getRef());
332
333                 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
334             }
335         };
336     }
337
338     @Test
339     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
340         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
341         new JavaTestKit(getSystem()) {
342             {
343                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
344
345                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
346
347                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
348                 shardManager.tell(new ActorInitialized(), mockShardActor);
349
350                 DataTree mockDataTree = mock(DataTree.class);
351                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
352                         DataStoreVersions.CURRENT_VERSION), getRef());
353
354                 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
355                 shardManager.tell(
356                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
357                         mockShardActor);
358
359                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
360
361                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
362                         LocalPrimaryShardFound.class);
363                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
364                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
365                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
366             }
367         };
368
369         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
370     }
371
372     @Test
373     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
374         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
375         new JavaTestKit(getSystem()) {
376             {
377                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
378
379                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
380                 shardManager.tell(new ActorInitialized(), mockShardActor);
381
382                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
383                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
384                 shardManager.tell(
385                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
386                         mockShardActor);
387                 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
388                         mockShardActor);
389
390                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
391
392                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
393             }
394         };
395
396         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
397     }
398
399     @Test
400     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
401         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
402         new JavaTestKit(getSystem()) {
403             {
404                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
405
406                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
407                 shardManager.tell(new ActorInitialized(), mockShardActor);
408
409                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
410                 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
411
412                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
413                 shardManager.tell(
414                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
415                         mockShardActor);
416                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
417                 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
418
419                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
420
421                 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
422                         RemotePrimaryShardFound.class);
423                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
424                         primaryFound.getPrimaryPath().contains("member-2-shard-default"));
425                 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
426             }
427         };
428
429         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
430     }
431
432     @Test
433     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
434         new JavaTestKit(getSystem()) {
435             {
436                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
437
438                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
439
440                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
441             }
442         };
443     }
444
445     @Test
446     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
447         new JavaTestKit(getSystem()) {
448             {
449                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
450
451                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
452                 shardManager.tell(new ActorInitialized(), mockShardActor);
453
454                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
455
456                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
457             }
458         };
459     }
460
461     @Test
462     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
463         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
464         new JavaTestKit(getSystem()) {
465             {
466                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
467
468                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
469                 shardManager.tell(new ActorInitialized(), mockShardActor);
470
471                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
472                 shardManager.tell(
473                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
474                         mockShardActor);
475
476                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
477
478                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
479
480                 DataTree mockDataTree = mock(DataTree.class);
481                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
482                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
483
484                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
485
486                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
487                         LocalPrimaryShardFound.class);
488                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
489                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
490                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
491             }
492         };
493
494         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
495     }
496
497     @Test
498     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
499         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
500         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
501         new JavaTestKit(getSystem()) {
502             {
503                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
504
505                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
506
507                 // We're passing waitUntilInitialized = true to FindPrimary so
508                 // the response should be
509                 // delayed until we send ActorInitialized and
510                 // RoleChangeNotification.
511                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
512
513                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
514
515                 shardManager.tell(new ActorInitialized(), mockShardActor);
516
517                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
518
519                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
520                 shardManager.tell(
521                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
522                         mockShardActor);
523
524                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
525
526                 DataTree mockDataTree = mock(DataTree.class);
527                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
528                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
529
530                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
531                         LocalPrimaryShardFound.class);
532                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
533                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
534                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
535
536                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
537             }
538         };
539
540         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
541     }
542
543     @Test
544     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
545         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
546         new JavaTestKit(getSystem()) {
547             {
548                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
549
550                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
551
552                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
553
554                 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
555
556                 shardManager.tell(new ActorInitialized(), mockShardActor);
557
558                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
559             }
560         };
561
562         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
563     }
564
565     @Test
566     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
567         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
568         new JavaTestKit(getSystem()) {
569             {
570                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
571
572                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
573                 shardManager.tell(new ActorInitialized(), mockShardActor);
574                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
575                         RaftState.Candidate.name()), mockShardActor);
576
577                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
578
579                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
580             }
581         };
582
583         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
584     }
585
586     @Test
587     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
588         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
589         new JavaTestKit(getSystem()) {
590             {
591                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
592
593                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
594                 shardManager.tell(new ActorInitialized(), mockShardActor);
595                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
596                         RaftState.IsolatedLeader.name()), mockShardActor);
597
598                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
599
600                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
601             }
602         };
603
604         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
605     }
606
607     @Test
608     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
609         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
610         new JavaTestKit(getSystem()) {
611             {
612                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
613
614                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
615                 shardManager.tell(new ActorInitialized(), mockShardActor);
616
617                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
618
619                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
620             }
621         };
622
623         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
624     }
625
626     @Test
627     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
628         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
629         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
630
631         // Create an ActorSystem ShardManager actor for member-1.
632
633         final ActorSystem system1 = newActorSystem("Member1");
634         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
635
636         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
637                 newTestShardMgrBuilderWithMockShardActor().cluster(
638                         new ClusterWrapperImpl(system1)).props().withDispatcher(
639                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
640
641         // Create an ActorSystem ShardManager actor for member-2.
642
643         final ActorSystem system2 = newActorSystem("Member2");
644
645         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
646
647         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
648
649         MockConfiguration mockConfig2 = new MockConfiguration(
650                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
651                         .put("astronauts", Arrays.asList("member-2")).build());
652
653         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
654                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
655                         new ClusterWrapperImpl(system2)).props().withDispatcher(
656                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
657
658         new JavaTestKit(system1) {
659             {
660                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
661                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
662
663                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
664
665                 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
666                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
667                 shardManager2.tell(
668                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
669                         mockShardActor2);
670                 shardManager2.tell(
671                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
672                         mockShardActor2);
673
674                 shardManager1.underlyingActor().waitForMemberUp();
675                 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
676
677                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
678                 String path = found.getPrimaryPath();
679                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
680                 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
681
682                 shardManager2.underlyingActor().verifyFindPrimary();
683
684                 Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
685
686                 shardManager1.underlyingActor().waitForMemberRemoved();
687
688                 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
689
690                 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
691             }
692         };
693
694         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
695     }
696
697     @Test
698     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
699         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
700         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
701
702         // Create an ActorSystem ShardManager actor for member-1.
703
704         final ActorSystem system1 = newActorSystem("Member1");
705         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
706
707         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
708
709         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
710                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
711                         new ClusterWrapperImpl(system1)).props().withDispatcher(
712                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
713
714         // Create an ActorSystem ShardManager actor for member-2.
715
716         final ActorSystem system2 = newActorSystem("Member2");
717
718         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
719
720         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
721
722         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
723                 .put("default", Arrays.asList("member-1", "member-2")).build());
724
725         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
726                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
727                         new ClusterWrapperImpl(system2)).props().withDispatcher(
728                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
729
730         new JavaTestKit(system1) {
731             {
732                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
733                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
734                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
735                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
736
737                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
738                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
739                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
740                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
741                 shardManager1.tell(
742                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
743                         mockShardActor1);
744                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
745                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
746                 shardManager2.tell(
747                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
748                         mockShardActor2);
749                 shardManager1.underlyingActor().waitForMemberUp();
750
751                 shardManager1.tell(new FindPrimary("default", true), getRef());
752
753                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
754                 String path = found.getPrimaryPath();
755                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
756
757                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
758                         "akka://cluster-test@127.0.0.1:2558"), getRef());
759
760                 shardManager1.underlyingActor().waitForUnreachableMember();
761
762                 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
763                 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
764                 MessageCollectorActor.clearMessages(mockShardActor1);
765
766                 shardManager1.tell(
767                         MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
768                         getRef());
769
770                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
771
772                 shardManager1.tell(new FindPrimary("default", true), getRef());
773
774                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
775
776                 shardManager1.tell(
777                         MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
778                         getRef());
779
780                 shardManager1.underlyingActor().waitForReachableMember();
781
782                 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
783                 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
784                 MessageCollectorActor.clearMessages(mockShardActor1);
785
786                 shardManager1.tell(new FindPrimary("default", true), getRef());
787
788                 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
789                 String path1 = found1.getPrimaryPath();
790                 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
791
792                 shardManager1.tell(
793                         MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
794                         getRef());
795
796                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
797
798                 // Test FindPrimary wait succeeds after reachable member event.
799
800                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
801                         "akka://cluster-test@127.0.0.1:2558"), getRef());
802                 shardManager1.underlyingActor().waitForUnreachableMember();
803
804                 shardManager1.tell(new FindPrimary("default", true), getRef());
805
806                 shardManager1.tell(
807                         MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
808                         getRef());
809
810                 RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
811                 String path2 = found2.getPrimaryPath();
812                 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
813             }
814         };
815
816         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
817     }
818
819     @Test
820     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
821         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
822         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
823
824         // Create an ActorSystem ShardManager actor for member-1.
825
826         final ActorSystem system1 = newActorSystem("Member1");
827         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
828
829         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
830
831         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
832         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
833                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
834                         .primaryShardInfoCache(primaryShardInfoCache).props()
835                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
836                 shardManagerID);
837
838         // Create an ActorSystem ShardManager actor for member-2.
839
840         final ActorSystem system2 = newActorSystem("Member2");
841
842         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
843
844         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
845
846         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
847                 .put("default", Arrays.asList("member-1", "member-2")).build());
848
849         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
850                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
851                         new ClusterWrapperImpl(system2)).props().withDispatcher(
852                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
853
854         new JavaTestKit(system1) {
855             {
856                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
857                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
858                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
859                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
860
861                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
862                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
863                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
864                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
865                 shardManager1.tell(
866                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
867                         mockShardActor1);
868                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
869                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
870                 shardManager2.tell(
871                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
872                         mockShardActor2);
873                 shardManager1.underlyingActor().waitForMemberUp();
874
875                 shardManager1.tell(new FindPrimary("default", true), getRef());
876
877                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
878                 String path = found.getPrimaryPath();
879                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
880
881                 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
882                         system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
883
884                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
885                         "akka://cluster-test@127.0.0.1:2558"), getRef());
886
887                 shardManager1.underlyingActor().waitForUnreachableMember();
888
889                 shardManager1.tell(new FindPrimary("default", true), getRef());
890
891                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
892
893                 assertNull("Expected primaryShardInfoCache entry removed",
894                         primaryShardInfoCache.getIfPresent("default"));
895
896                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
897                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
898                 shardManager1.tell(
899                         new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
900                         mockShardActor1);
901
902                 shardManager1.tell(new FindPrimary("default", true), getRef());
903
904                 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
905                 String path1 = found1.getPrimaryPath();
906                 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
907
908             }
909         };
910
911         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
912     }
913
914
915     @Test
916     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
917         new JavaTestKit(getSystem()) {
918             {
919                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
920
921                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
922
923                 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
924
925                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
926
927                 assertEquals("getShardName", "non-existent", notFound.getShardName());
928             }
929         };
930     }
931
932     @Test
933     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
934         new JavaTestKit(getSystem()) {
935             {
936                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
937
938                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
939                 shardManager.tell(new ActorInitialized(), mockShardActor);
940
941                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
942
943                 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
944
945                 assertTrue("Found path contains " + found.getPath().path().toString(),
946                         found.getPath().path().toString().contains("member-1-shard-default-config"));
947             }
948         };
949     }
950
951     @Test
952     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
953         new JavaTestKit(getSystem()) {
954             {
955                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
956
957                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
958
959                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
960             }
961         };
962     }
963
964     @Test
965     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
966         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
967         new JavaTestKit(getSystem()) {
968             {
969                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
970
971                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
972
973                 // We're passing waitUntilInitialized = true to FindLocalShard
974                 // so the response should be
975                 // delayed until we send ActorInitialized.
976                 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
977                         new Timeout(5, TimeUnit.SECONDS));
978
979                 shardManager.tell(new ActorInitialized(), mockShardActor);
980
981                 Object resp = Await.result(future, duration("5 seconds"));
982                 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
983             }
984         };
985
986         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
987     }
988
989     @Test
990     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
991         TestShardManager shardManager = newTestShardManager();
992
993         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
994         shardManager.onReceiveCommand(new RoleChangeNotification(
995                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
996
997         verify(ready, never()).countDown();
998
999         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1000                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1001
1002         verify(ready, times(1)).countDown();
1003     }
1004
1005     @Test
1006     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1007         new JavaTestKit(getSystem()) {
1008             {
1009                 TestShardManager shardManager = newTestShardManager();
1010
1011                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1012                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1013
1014                 verify(ready, never()).countDown();
1015
1016                 shardManager
1017                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1018
1019                 shardManager.onReceiveCommand(
1020                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1021                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1022
1023                 verify(ready, times(1)).countDown();
1024             }
1025         };
1026     }
1027
1028     @Test
1029     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1030         new JavaTestKit(getSystem()) {
1031             {
1032                 TestShardManager shardManager = newTestShardManager();
1033
1034                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1035                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1036
1037                 verify(ready, never()).countDown();
1038
1039                 shardManager.onReceiveCommand(
1040                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1041                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1042
1043                 shardManager
1044                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1045
1046                 verify(ready, times(1)).countDown();
1047             }
1048         };
1049     }
1050
1051     @Test
1052     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1053         TestShardManager shardManager = newTestShardManager();
1054
1055         shardManager.onReceiveCommand(new RoleChangeNotification(
1056                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1057
1058         verify(ready, never()).countDown();
1059     }
1060
1061     @Test
1062     public void testByDefaultSyncStatusIsFalse() throws Exception {
1063         TestShardManager shardManager = newTestShardManager();
1064
1065         assertEquals(false, shardManager.getMBean().getSyncStatus());
1066     }
1067
1068     @Test
1069     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1070         TestShardManager shardManager = newTestShardManager();
1071
1072         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1073                 RaftState.Follower.name(), RaftState.Leader.name()));
1074
1075         assertEquals(true, shardManager.getMBean().getSyncStatus());
1076     }
1077
1078     @Test
1079     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1080         TestShardManager shardManager = newTestShardManager();
1081
1082         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1083         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1084                 RaftState.Follower.name(), RaftState.Candidate.name()));
1085
1086         assertEquals(false, shardManager.getMBean().getSyncStatus());
1087
1088         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1089         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1090                 true, shardId));
1091
1092         assertEquals(false, shardManager.getMBean().getSyncStatus());
1093     }
1094
1095     @Test
1096     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1097         TestShardManager shardManager = newTestShardManager();
1098
1099         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1100         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1101                 RaftState.Candidate.name(), RaftState.Follower.name()));
1102
1103         // Initially will be false
1104         assertEquals(false, shardManager.getMBean().getSyncStatus());
1105
1106         // Send status true will make sync status true
1107         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1108
1109         assertEquals(true, shardManager.getMBean().getSyncStatus());
1110
1111         // Send status false will make sync status false
1112         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1113
1114         assertEquals(false, shardManager.getMBean().getSyncStatus());
1115
1116     }
1117
1118     @Test
1119     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1120         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1121         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1122             @Override
1123             public List<String> getMemberShardNames(MemberName memberName) {
1124                 return Arrays.asList("default", "astronauts");
1125             }
1126         }));
1127
1128         // Initially will be false
1129         assertEquals(false, shardManager.getMBean().getSyncStatus());
1130
1131         // Make default shard leader
1132         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1133         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1134                 RaftState.Follower.name(), RaftState.Leader.name()));
1135
1136         // default = Leader, astronauts is unknown so sync status remains false
1137         assertEquals(false, shardManager.getMBean().getSyncStatus());
1138
1139         // Make astronauts shard leader as well
1140         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1141         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1142                 RaftState.Follower.name(), RaftState.Leader.name()));
1143
1144         // Now sync status should be true
1145         assertEquals(true, shardManager.getMBean().getSyncStatus());
1146
1147         // Make astronauts a Follower
1148         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1149                 RaftState.Leader.name(), RaftState.Follower.name()));
1150
1151         // Sync status is not true
1152         assertEquals(false, shardManager.getMBean().getSyncStatus());
1153
1154         // Make the astronauts follower sync status true
1155         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1156
1157         // Sync status is now true
1158         assertEquals(true, shardManager.getMBean().getSyncStatus());
1159
1160         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1161     }
1162
1163     @Test
1164     public void testOnReceiveSwitchShardBehavior() throws Exception {
1165         new JavaTestKit(getSystem()) {
1166             {
1167                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1168
1169                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1170                 shardManager.tell(new ActorInitialized(), mockShardActor);
1171
1172                 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1173
1174                 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1175                         SwitchBehavior.class);
1176
1177                 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1178                 assertEquals(1000, switchBehavior.getNewTerm());
1179             }
1180         };
1181     }
1182
1183     private static List<MemberName> members(String... names) {
1184         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1185     }
1186
1187     @Test
1188     public void testOnCreateShard() {
1189         LOG.info("testOnCreateShard starting");
1190         new JavaTestKit(getSystem()) {
1191             {
1192                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1193
1194                 ActorRef shardManager = actorFactory
1195                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1196                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1197
1198                 SchemaContext schemaContext = TestModel.createTestContext();
1199                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1200
1201                 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1202                         .persistent(false).build();
1203                 Shard.Builder shardBuilder = Shard.builder();
1204
1205                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1206                         "foo", null, members("member-1", "member-5", "member-6"));
1207                 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1208
1209                 expectMsgClass(duration("5 seconds"), Success.class);
1210
1211                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1212
1213                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1214
1215                 assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1216                 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1217                         .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1218                 assertEquals("peerMembers", Sets.newHashSet(
1219                         ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1220                         ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1221                         shardBuilder.getPeerAddresses().keySet());
1222                 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1223                         shardBuilder.getId());
1224                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1225
1226                 // Send CreateShard with same name - should return Success with
1227                 // a message.
1228
1229                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1230
1231                 Success success = expectMsgClass(duration("5 seconds"), Success.class);
1232                 assertNotNull("Success status is null", success.status());
1233             }
1234         };
1235
1236         LOG.info("testOnCreateShard ending");
1237     }
1238
1239     @Test
1240     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1241         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1242         new JavaTestKit(getSystem()) {
1243             {
1244                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1245
1246                 ActorRef shardManager = actorFactory
1247                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1248                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1249
1250                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1251
1252                 Shard.Builder shardBuilder = Shard.builder();
1253                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1254                         "foo", null, members("member-5", "member-6"));
1255
1256                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1257                 expectMsgClass(duration("5 seconds"), Success.class);
1258
1259                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1260                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1261
1262                 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1263                 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1264                         .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1265             }
1266         };
1267
1268         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1269     }
1270
1271     @Test
1272     public void testOnCreateShardWithNoInitialSchemaContext() {
1273         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1274         new JavaTestKit(getSystem()) {
1275             {
1276                 ActorRef shardManager = actorFactory
1277                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1278                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1279
1280                 Shard.Builder shardBuilder = Shard.builder();
1281
1282                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1283                         "foo", null, members("member-1"));
1284                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1285
1286                 expectMsgClass(duration("5 seconds"), Success.class);
1287
1288                 SchemaContext schemaContext = TestModel.createTestContext();
1289                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1290
1291                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1292
1293                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1294
1295                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1296                 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1297             }
1298         };
1299
1300         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1301     }
1302
1303     @Test
1304     public void testGetSnapshot() throws Exception {
1305         LOG.info("testGetSnapshot starting");
1306         JavaTestKit kit = new JavaTestKit(getSystem());
1307
1308         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1309                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1310                 .put("astronauts", Collections.<String>emptyList()).build());
1311
1312         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1313                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1314
1315         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1316         Failure failure = kit.expectMsgClass(Failure.class);
1317         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1318
1319         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1320
1321         waitForShardInitialized(shardManager, "shard1", kit);
1322         waitForShardInitialized(shardManager, "shard2", kit);
1323
1324         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1325
1326         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1327
1328         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1329         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1330
1331         Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
1332
1333         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1334                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1335
1336         // Add a new replica
1337
1338         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1339
1340         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1341         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1342
1343         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1344         mockShardLeaderKit.expectMsgClass(AddServer.class);
1345         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1346         kit.expectMsgClass(Status.Success.class);
1347         waitForShardInitialized(shardManager, "astronauts", kit);
1348
1349         // Send another GetSnapshot and verify
1350
1351         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1352         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1353
1354         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1355                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1356
1357         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1358         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1359         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1360                 Sets.newHashSet(snapshot.getShardList()));
1361
1362         LOG.info("testGetSnapshot ending");
1363     }
1364
1365     @Test
1366     public void testRestoreFromSnapshot() throws Exception {
1367         LOG.info("testRestoreFromSnapshot starting");
1368
1369         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1370
1371         JavaTestKit kit = new JavaTestKit(getSystem());
1372
1373         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1374                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1375                 .put("astronauts", Collections.<String>emptyList()).build());
1376
1377         ShardManagerSnapshot snapshot =
1378                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1379         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1380                 Collections.<ShardSnapshot>emptyList());
1381         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1382                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1383
1384         shardManager.underlyingActor().waitForRecoveryComplete();
1385
1386         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1387
1388         waitForShardInitialized(shardManager, "shard1", kit);
1389         waitForShardInitialized(shardManager, "shard2", kit);
1390         waitForShardInitialized(shardManager, "astronauts", kit);
1391
1392         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1393
1394         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1395
1396         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1397
1398         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1399         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1400                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1401
1402         LOG.info("testRestoreFromSnapshot ending");
1403     }
1404
1405     @Test
1406     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1407         new JavaTestKit(getSystem()) {
1408             {
1409                 ActorRef shardManager = actorFactory
1410                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1411                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1412
1413                 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1414                 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1415
1416                 assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
1417             }
1418         };
1419     }
1420
1421     @Test
1422     public void testAddShardReplica() throws Exception {
1423         LOG.info("testAddShardReplica starting");
1424         MockConfiguration mockConfig = new MockConfiguration(
1425                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1426                         .put("astronauts", Arrays.asList("member-2")).build());
1427
1428         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1429         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1430
1431         // Create an ActorSystem ShardManager actor for member-1.
1432         final ActorSystem system1 = newActorSystem("Member1");
1433         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1434         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1435         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1436                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1437                         .cluster(new ClusterWrapperImpl(system1)).props()
1438                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1439                 shardManagerID);
1440
1441         // Create an ActorSystem ShardManager actor for member-2.
1442         final ActorSystem system2 = newActorSystem("Member2");
1443         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1444
1445         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1446         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1447         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1448                 Props.create(MockRespondActor.class, AddServer.class,
1449                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1450                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1451                 name);
1452         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1453                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1454                         .cluster(new ClusterWrapperImpl(system2)).props()
1455                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1456                 shardManagerID);
1457
1458         new JavaTestKit(system1) {
1459             {
1460                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1461                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1462
1463                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1464
1465                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1466                 leaderShardManager.tell(
1467                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1468                         mockShardLeaderActor);
1469                 leaderShardManager.tell(
1470                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1471                         mockShardLeaderActor);
1472
1473                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1474                 leaderShardManager.underlyingActor().waitForMemberUp();
1475
1476                 // Have a dummy snapshot to be overwritten by the new data
1477                 // persisted.
1478                 String[] restoredShards = { "default", "people" };
1479                 ShardManagerSnapshot snapshot =
1480                         new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1481                 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1482                 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1483
1484                 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1485                 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1486
1487                 // construct a mock response message
1488                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1489                 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1490                         AddServer.class);
1491                 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1492                 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1493                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1494
1495                 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1496                 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1497                 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1498                         ShardManagerSnapshot.class);
1499                 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1500                 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1501                 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1502                         Sets.newHashSet(shardManagerSnapshot.getShardList()));
1503             }
1504         };
1505         LOG.info("testAddShardReplica ending");
1506     }
1507
1508     @Test
1509     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1510         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1511         new JavaTestKit(getSystem()) {
1512             {
1513                 TestActorRef<TestShardManager> shardManager = actorFactory
1514                         .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1515
1516                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1517                 shardManager.tell(new ActorInitialized(), mockShardActor);
1518
1519                 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1520                 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1521                 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1522                         .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1523
1524                 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1525
1526                 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1527                 shardManager.tell(
1528                         new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1529                         mockShardActor);
1530                 shardManager.tell(
1531                         new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1532                         mockShardActor);
1533
1534                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1535
1536                 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1537
1538                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1539                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1540
1541                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1542                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1543
1544                 // Send message again to verify previous in progress state is
1545                 // cleared
1546
1547                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1548                 resp = expectMsgClass(duration("5 seconds"), Failure.class);
1549                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1550
1551                 // Send message again with an AddServer timeout to verify the
1552                 // pre-existing shard actor isn't terminated.
1553
1554                 shardManager.tell(
1555                         newDatastoreContextFactory(
1556                                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()),
1557                         getRef());
1558                 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1559                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1560                 expectMsgClass(duration("5 seconds"), Failure.class);
1561
1562                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1563                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1564             }
1565         };
1566
1567         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1568     }
1569
1570     @Test
1571     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1572         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1573         new JavaTestKit(getSystem()) {
1574             {
1575                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1576                 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1577
1578                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1579                 shardManager.tell(new ActorInitialized(), mockShardActor);
1580                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1581                         DataStoreVersions.CURRENT_VERSION), getRef());
1582                 shardManager.tell(
1583                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1584                         mockShardActor);
1585
1586                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1587                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1588                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1589
1590                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1591                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1592             }
1593         };
1594
1595         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1596     }
1597
1598     @Test
1599     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1600         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1601         new JavaTestKit(getSystem()) {
1602             {
1603                 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1604
1605                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1606                         .put("astronauts", Arrays.asList("member-2")).build());
1607
1608                 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1609                 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1610                         newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1611                             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1612                 shardManager.underlyingActor()
1613                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1614
1615                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1616
1617                 JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1618                 terminateWatcher.watch(mockNewReplicaShardActor);
1619
1620                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1621
1622                 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1623                 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1624                         addServerMsg.getNewServerId());
1625                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1626
1627                 Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1628                 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1629
1630                 shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1631                 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1632
1633                 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1634
1635                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1636                 mockShardLeaderKit.expectMsgClass(AddServer.class);
1637                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1638                 failure = expectMsgClass(duration("5 seconds"), Failure.class);
1639                 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1640             }
1641         };
1642
1643         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1644     }
1645
1646     @Test
1647     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1648         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1649                 AddServer.class, new AddShardReplica("astronauts"));
1650     }
1651
1652     @Test
1653     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1654         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1655         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1656         new JavaTestKit(getSystem()) {
1657             {
1658                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1659                         .put("astronauts", Arrays.asList("member-2")).build());
1660
1661                 final ActorRef newReplicaShardManager = actorFactory
1662                         .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1663                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1664
1665                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1666                 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1667                         AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1668
1669                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1670                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1671                 assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
1672             }
1673         };
1674
1675         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1676     }
1677
1678     @Test
1679     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1680         new JavaTestKit(getSystem()) {
1681             {
1682                 ActorRef shardManager = actorFactory
1683                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1684                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1685
1686                 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1687                 Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1688                 assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
1689             }
1690         };
1691     }
1692
1693     @Test
1694     /**
1695      * Primary is Local
1696      */
1697     public void testRemoveShardReplicaLocal() throws Exception {
1698         new JavaTestKit(getSystem()) {
1699             {
1700                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1701
1702                 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1703                         RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1704
1705                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1706
1707                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1708                 shardManager.tell(new ActorInitialized(), respondActor);
1709                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1710                         DataStoreVersions.CURRENT_VERSION), getRef());
1711                 shardManager.tell(
1712                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1713                         respondActor);
1714
1715                 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1716                 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1717                         RemoveServer.class);
1718                 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1719                         removeServer.getServerId());
1720                 expectMsgClass(duration("5 seconds"), Success.class);
1721             }
1722         };
1723     }
1724
1725     @Test
1726     public void testRemoveShardReplicaRemote() throws Exception {
1727         MockConfiguration mockConfig = new MockConfiguration(
1728                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1729                         .put("astronauts", Arrays.asList("member-1")).build());
1730
1731         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1732
1733         // Create an ActorSystem ShardManager actor for member-1.
1734         final ActorSystem system1 = newActorSystem("Member1");
1735         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1736         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1737
1738         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1739                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1740                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1741                 shardManagerID);
1742
1743         // Create an ActorSystem ShardManager actor for member-2.
1744         final ActorSystem system2 = newActorSystem("Member2");
1745         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1746
1747         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1748         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1749         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1750                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1751                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1752
1753         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1754
1755         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1756                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1757                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1758                 shardManagerID);
1759
1760         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1761         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1762         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1763         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1764         // look like so,
1765         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1766         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1767         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1768         // dead letters.
1769         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1770         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1771         // thing works as expected
1772         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1773                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1774                         "member-2-shard-default-" + shardMrgIDSuffix);
1775
1776         LOG.error("Forwarding actor : {}", actorRef);
1777
1778         new JavaTestKit(system1) {
1779             {
1780                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1781                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1782
1783                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1784                 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1785
1786                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1787                 leaderShardManager.tell(
1788                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1789                         mockShardLeaderActor);
1790                 leaderShardManager.tell(
1791                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1792                         mockShardLeaderActor);
1793
1794                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1795                 newReplicaShardManager.tell(
1796                         new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1797                         mockShardActor);
1798                 newReplicaShardManager.tell(
1799                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1800                         mockShardActor);
1801
1802                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1803                 leaderShardManager.underlyingActor().waitForMemberUp();
1804
1805                 // construct a mock response message
1806                 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1807                 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1808                         RemoveServer.class);
1809                 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1810                 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1811                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1812             }
1813         };
1814     }
1815
1816     @Test
1817     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1818         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1819                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1820     }
1821
1822     @Test
1823     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1824         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1825                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1826     }
1827
1828
1829     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1830                                                       final Class<?> firstForwardedServerChangeClass,
1831                                                       final Object secondServerChange) throws Exception {
1832         new JavaTestKit(getSystem()) {
1833             {
1834                 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1835                 final JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1836
1837                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1838                         .put(shardName, Arrays.asList("member-2")).build());
1839
1840                 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1841                         newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1842                                 .cluster(new MockClusterWrapper()).props()
1843                                 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1844                         shardMgrID);
1845
1846                 shardManager.underlyingActor()
1847                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1848
1849                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1850
1851                 shardManager.tell(firstServerChange, getRef());
1852
1853                 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1854
1855                 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1856
1857                 secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1858             }
1859         };
1860     }
1861
1862     @Test
1863     public void testServerRemovedShardActorNotRunning() throws Exception {
1864         LOG.info("testServerRemovedShardActorNotRunning starting");
1865         new JavaTestKit(getSystem()) {
1866             {
1867                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1868                         .put("default", Arrays.asList("member-1", "member-2"))
1869                         .put("astronauts", Arrays.asList("member-2"))
1870                         .put("people", Arrays.asList("member-1", "member-2")).build());
1871
1872                 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1873                         newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1874
1875                 shardManager.underlyingActor().waitForRecoveryComplete();
1876                 shardManager.tell(new FindLocalShard("people", false), getRef());
1877                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1878
1879                 shardManager.tell(new FindLocalShard("default", false), getRef());
1880                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1881
1882                 // Removed the default shard replica from member-1
1883                 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1884                 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1885                         .build();
1886                 shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1887
1888                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1889             }
1890         };
1891
1892         LOG.info("testServerRemovedShardActorNotRunning ending");
1893     }
1894
1895     @Test
1896     public void testServerRemovedShardActorRunning() throws Exception {
1897         LOG.info("testServerRemovedShardActorRunning starting");
1898         new JavaTestKit(getSystem()) {
1899             {
1900                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1901                         .put("default", Arrays.asList("member-1", "member-2"))
1902                         .put("astronauts", Arrays.asList("member-2"))
1903                         .put("people", Arrays.asList("member-1", "member-2")).build());
1904
1905                 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1906                 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1907
1908                 TestActorRef<TestShardManager> shardManager = actorFactory
1909                         .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1910                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1911
1912                 shardManager.underlyingActor().waitForRecoveryComplete();
1913
1914                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1915                 shardManager.tell(new ActorInitialized(), shard);
1916
1917                 waitForShardInitialized(shardManager, "people", this);
1918                 waitForShardInitialized(shardManager, "default", this);
1919
1920                 // Removed the default shard replica from member-1
1921                 shardManager.tell(new ServerRemoved(shardId), getRef());
1922
1923                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1924
1925                 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1926             }
1927         };
1928
1929         LOG.info("testServerRemovedShardActorRunning ending");
1930     }
1931
1932     @Test
1933     public void testShardPersistenceWithRestoredData() throws Exception {
1934         LOG.info("testShardPersistenceWithRestoredData starting");
1935         new JavaTestKit(getSystem()) {
1936             {
1937                 MockConfiguration mockConfig =
1938                     new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1939                             .put("default", Arrays.asList("member-1", "member-2"))
1940                             .put("astronauts", Arrays.asList("member-2"))
1941                             .put("people", Arrays.asList("member-1", "member-2")).build());
1942                 String[] restoredShards = {"default", "astronauts"};
1943                 ShardManagerSnapshot snapshot =
1944                         new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1945                 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1946
1947                 // create shardManager to come up with restored data
1948                 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1949                         newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1950
1951                 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1952
1953                 newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1954                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1955                 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1956
1957                 // Verify a local shard is created for the restored shards,
1958                 // although we expect a NotInitializedException for the shards
1959                 // as the actor initialization
1960                 // message is not sent for them
1961                 newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1962                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1963
1964                 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1965                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1966             }
1967         };
1968
1969         LOG.info("testShardPersistenceWithRestoredData ending");
1970     }
1971
1972     @Test
1973     public void testShutDown() throws Exception {
1974         LOG.info("testShutDown starting");
1975         new JavaTestKit(getSystem()) {
1976             {
1977                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1978                         .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1979
1980                 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1981                 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1982
1983                 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1984                 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1985
1986                 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1987                         .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1988
1989                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1990                 shardManager.tell(new ActorInitialized(), shard1);
1991                 shardManager.tell(new ActorInitialized(), shard2);
1992
1993                 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1994                 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1995
1996                 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1997                 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1998
1999                 try {
2000                     Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2001                     fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2002                 } catch (TimeoutException e) {
2003                     // expected
2004                 }
2005
2006                 actorFactory.killActor(shard1, this);
2007                 actorFactory.killActor(shard2, this);
2008
2009                 Boolean stopped = Await.result(stopFuture, duration);
2010                 assertEquals("Stopped", Boolean.TRUE, stopped);
2011             }
2012         };
2013
2014         LOG.info("testShutDown ending");
2015     }
2016
2017     @Test
2018     public void testChangeServersVotingStatus() throws Exception {
2019         new JavaTestKit(getSystem()) {
2020             {
2021                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2022
2023                 ActorRef respondActor = actorFactory
2024                         .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2025                                 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2026
2027                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2028
2029                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2030                 shardManager.tell(new ActorInitialized(), respondActor);
2031                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2032                         DataStoreVersions.CURRENT_VERSION), getRef());
2033                 shardManager.tell(
2034                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2035                         respondActor);
2036
2037                 shardManager.tell(
2038                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2039                         getRef());
2040
2041                 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2042                         .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2043                 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2044                         ImmutableMap.of(ShardIdentifier
2045                                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2046                                 Boolean.TRUE));
2047
2048                 expectMsgClass(duration("5 seconds"), Success.class);
2049             }
2050         };
2051     }
2052
2053     @Test
2054     public void testChangeServersVotingStatusWithNoLeader() throws Exception {
2055         new JavaTestKit(getSystem()) {
2056             {
2057                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2058
2059                 ActorRef respondActor = actorFactory
2060                         .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2061                                 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2062
2063                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2064
2065                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2066                 shardManager.tell(new ActorInitialized(), respondActor);
2067                 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2068
2069                 shardManager.tell(
2070                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2071                         getRef());
2072
2073                 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2074
2075                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
2076                 assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
2077             }
2078         };
2079     }
2080
2081     public static class TestShardManager extends ShardManager {
2082         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2083         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2084         private ShardManagerSnapshot snapshot;
2085         private final Map<String, ActorRef> shardActors;
2086         private final ActorRef shardActor;
2087         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2088         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2089         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2090         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2091         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2092         private volatile MessageInterceptor messageInterceptor;
2093
2094         private TestShardManager(Builder builder) {
2095             super(builder);
2096             shardActor = builder.shardActor;
2097             shardActors = builder.shardActors;
2098         }
2099
2100         @Override
2101         protected void handleRecover(Object message) throws Exception {
2102             try {
2103                 super.handleRecover(message);
2104             } finally {
2105                 if (message instanceof RecoveryCompleted) {
2106                     recoveryComplete.countDown();
2107                 }
2108             }
2109         }
2110
2111         private void countDownIfOther(final Member member, CountDownLatch latch) {
2112             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2113                 latch.countDown();
2114             }
2115         }
2116
2117         @Override
2118         public void handleCommand(Object message) throws Exception {
2119             try {
2120                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2121                     getSender().tell(messageInterceptor.apply(message), getSelf());
2122                 } else {
2123                     super.handleCommand(message);
2124                 }
2125             } finally {
2126                 if (message instanceof FindPrimary) {
2127                     findPrimaryMessageReceived.countDown();
2128                 } else if (message instanceof ClusterEvent.MemberUp) {
2129                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2130                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2131                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2132                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2133                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2134                 } else if (message instanceof ClusterEvent.ReachableMember) {
2135                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2136                 }
2137             }
2138         }
2139
2140         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2141             this.messageInterceptor = messageInterceptor;
2142         }
2143
2144         void waitForRecoveryComplete() {
2145             assertEquals("Recovery complete", true,
2146                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2147         }
2148
2149         public void waitForMemberUp() {
2150             assertEquals("MemberUp received", true,
2151                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2152             memberUpReceived = new CountDownLatch(1);
2153         }
2154
2155         void waitForMemberRemoved() {
2156             assertEquals("MemberRemoved received", true,
2157                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2158             memberRemovedReceived = new CountDownLatch(1);
2159         }
2160
2161         void waitForUnreachableMember() {
2162             assertEquals("UnreachableMember received", true,
2163                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2164                 ));
2165             memberUnreachableReceived = new CountDownLatch(1);
2166         }
2167
2168         void waitForReachableMember() {
2169             assertEquals("ReachableMember received", true,
2170                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2171             memberReachableReceived = new CountDownLatch(1);
2172         }
2173
2174         void verifyFindPrimary() {
2175             assertEquals("FindPrimary received", true,
2176                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2177             findPrimaryMessageReceived = new CountDownLatch(1);
2178         }
2179
2180         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2181             return new Builder(datastoreContextBuilder);
2182         }
2183
2184         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2185             private ActorRef shardActor;
2186             private final Map<String, ActorRef> shardActors = new HashMap<>();
2187
2188             Builder(DatastoreContext.Builder datastoreContextBuilder) {
2189                 super(TestShardManager.class);
2190                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2191             }
2192
2193             Builder shardActor(ActorRef newShardActor) {
2194                 this.shardActor = newShardActor;
2195                 return this;
2196             }
2197
2198             Builder addShardActor(String shardName, ActorRef actorRef) {
2199                 shardActors.put(shardName, actorRef);
2200                 return this;
2201             }
2202         }
2203
2204         @Override
2205         public void saveSnapshot(Object obj) {
2206             snapshot = (ShardManagerSnapshot) obj;
2207             snapshotPersist.countDown();
2208             super.saveSnapshot(obj);
2209         }
2210
2211         void verifySnapshotPersisted(Set<String> shardList) {
2212             assertEquals("saveSnapshot invoked", true,
2213                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2214             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2215         }
2216
2217         @Override
2218         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2219             if (shardActors.get(info.getShardName()) != null) {
2220                 return shardActors.get(info.getShardName());
2221             }
2222
2223             if (shardActor != null) {
2224                 return shardActor;
2225             }
2226
2227             return super.newShardActor(schemaContext, info);
2228         }
2229     }
2230
2231     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2232                                                      extends AbstractShardManagerCreator<T> {
2233         private final Class<C> shardManagerClass;
2234
2235         AbstractGenericCreator(Class<C> shardManagerClass) {
2236             this.shardManagerClass = shardManagerClass;
2237             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2238                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2239         }
2240
2241         @Override
2242         public Props props() {
2243             verify();
2244             return Props.create(shardManagerClass, this);
2245         }
2246     }
2247
2248     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2249         GenericCreator(Class<C> shardManagerClass) {
2250             super(shardManagerClass);
2251         }
2252     }
2253
2254     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2255         private static final long serialVersionUID = 1L;
2256         private final Creator<ShardManager> delegate;
2257
2258         DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2259             this.delegate = delegate;
2260         }
2261
2262         @Override
2263         public ShardManager create() throws Exception {
2264             return delegate.create();
2265         }
2266     }
2267
2268     interface MessageInterceptor extends Function<Object, Object> {
2269         boolean canIntercept(Object message);
2270     }
2271
2272     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2273         return new MessageInterceptor() {
2274             @Override
2275             public Object apply(Object message) {
2276                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2277             }
2278
2279             @Override
2280             public boolean canIntercept(Object message) {
2281                 return message instanceof FindPrimary;
2282             }
2283         };
2284     }
2285
2286     private static class MockRespondActor extends MessageCollectorActor {
2287         static final String CLEAR_RESPONSE = "clear-response";
2288
2289         private Object responseMsg;
2290         private final Class<?> requestClass;
2291
2292         @SuppressWarnings("unused")
2293         MockRespondActor(Class<?> requestClass, Object responseMsg) {
2294             this.requestClass = requestClass;
2295             this.responseMsg = responseMsg;
2296         }
2297
2298         @Override
2299         public void onReceive(Object message) throws Exception {
2300             if (message.equals(CLEAR_RESPONSE)) {
2301                 responseMsg = null;
2302             } else {
2303                 super.onReceive(message);
2304                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2305                     getSender().tell(responseMsg, getSelf());
2306                 }
2307             }
2308         }
2309     }
2310 }