tell-based - reconnect on leader change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.anyString;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.times;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
26
27 import akka.actor.ActorRef;
28 import akka.actor.ActorSystem;
29 import akka.actor.AddressFromURIString;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Function;
47 import com.google.common.base.Stopwatch;
48 import com.google.common.collect.ImmutableMap;
49 import com.google.common.collect.Lists;
50 import com.google.common.collect.Sets;
51 import com.google.common.util.concurrent.Uninterruptibles;
52 import java.net.URI;
53 import java.util.AbstractMap;
54 import java.util.Arrays;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
59 import java.util.Map;
60 import java.util.Map.Entry;
61 import java.util.Set;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import java.util.function.Consumer;
66 import java.util.stream.Collectors;
67 import org.junit.AfterClass;
68 import org.junit.BeforeClass;
69 import org.junit.Test;
70 import org.opendaylight.controller.cluster.access.concepts.MemberName;
71 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
72 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
73 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
74 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
75 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
76 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
77 import org.opendaylight.controller.cluster.datastore.Shard;
78 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
79 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
80 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
81 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
82 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
83 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
84 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
85 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
86 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
87 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
88 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
89 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
90 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
91 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
92 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
93 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
94 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
95 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
96 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
97 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
98 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
99 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
100 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
101 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
102 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
103 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
104 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
105 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
106 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
107 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
108 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
109 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
110 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
111 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
112 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
113 import org.opendaylight.controller.cluster.raft.RaftState;
114 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
115 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
116 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
117 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
118 import org.opendaylight.controller.cluster.raft.messages.AddServer;
119 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
120 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
121 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
122 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
123 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
124 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
125 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
126 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
127 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
128 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
129 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
130 import org.opendaylight.yangtools.concepts.Registration;
131 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
132 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
133 import org.slf4j.Logger;
134 import org.slf4j.LoggerFactory;
135 import scala.concurrent.Await;
136 import scala.concurrent.Future;
137 import scala.concurrent.duration.Duration;
138 import scala.concurrent.duration.FiniteDuration;
139
140 public class ShardManagerTest extends AbstractShardManagerTest {
141     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
142     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
143     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
144
145     private static SchemaContext TEST_SCHEMA_CONTEXT;
146
147     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
148
149     @BeforeClass
150     public static void beforeClass() {
151         TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
152     }
153
154     @AfterClass
155     public static void afterClass() {
156         TEST_SCHEMA_CONTEXT = null;
157     }
158
159     private ActorSystem newActorSystem(final String config) {
160         return newActorSystem("cluster-test", config);
161     }
162
163     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
164         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
165         if (system == getSystem()) {
166             return actorFactory.createActor(MessageCollectorActor.props(), name);
167         }
168
169         return system.actorOf(MessageCollectorActor.props(), name);
170     }
171
172     private Props newShardMgrProps() {
173         return newShardMgrProps(new MockConfiguration());
174     }
175
176     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
177         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
178         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
179         doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
180         return mockFactory;
181     }
182
183     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
184         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
185     }
186
187     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
188         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
189                 .distributedDataStore(mock(DistributedDataStore.class));
190     }
191
192
193     private Props newPropsShardMgrWithMockShardActor() {
194         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
195                 Dispatchers.DefaultDispatcherId());
196     }
197
198     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
199         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
200                 .withDispatcher(Dispatchers.DefaultDispatcherId());
201     }
202
203
204     private TestShardManager newTestShardManager() {
205         return newTestShardManager(newShardMgrProps());
206     }
207
208     private TestShardManager newTestShardManager(final Props props) {
209         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
210         TestShardManager shardManager = shardManagerActor.underlyingActor();
211         shardManager.waitForRecoveryComplete();
212         return shardManager;
213     }
214
215     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
216             final TestKit kit) {
217         AssertionError last = null;
218         Stopwatch sw = Stopwatch.createStarted();
219         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
220             try {
221                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
222                 kit.expectMsgClass(LocalShardFound.class);
223                 return;
224             } catch (AssertionError e) {
225                 last = e;
226             }
227
228             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
229         }
230
231         throw last;
232     }
233
234     @SuppressWarnings("unchecked")
235     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
236         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
237         if (reply instanceof Failure) {
238             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
239         }
240
241         return (T)reply;
242     }
243
244     @Test
245     public void testPerShardDatastoreContext() throws Exception {
246         LOG.info("testPerShardDatastoreContext starting");
247         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
248                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
249
250         doReturn(
251                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
252                 .when(mockFactory).getShardDatastoreContext("default");
253
254         doReturn(
255                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
256                 .when(mockFactory).getShardDatastoreContext("topology");
257
258         final MockConfiguration mockConfig = new MockConfiguration() {
259             @Override
260             public Collection<String> getMemberShardNames(final MemberName memberName) {
261                 return Arrays.asList("default", "topology");
262             }
263
264             @Override
265             public Collection<MemberName> getMembersFromShardName(final String shardName) {
266                 return members("member-1");
267             }
268         };
269
270         final ActorRef defaultShardActor = actorFactory.createActor(
271                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
272         final ActorRef topologyShardActor = actorFactory.createActor(
273                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
274
275         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
276                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
277         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
278         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
279
280         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
281         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
282         class LocalShardManager extends ShardManager {
283             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
284                 super(creator);
285             }
286
287             @Override
288             protected ActorRef newShardActor(final ShardInformation info) {
289                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
290                 ActorRef ref = null;
291                 if (entry != null) {
292                     ref = entry.getKey();
293                     entry.setValue(info.getDatastoreContext());
294                 }
295
296                 newShardActorLatch.countDown();
297                 return ref;
298             }
299         }
300
301         final Creator<ShardManager> creator = new Creator<ShardManager>() {
302             private static final long serialVersionUID = 1L;
303             @Override
304             public ShardManager create() {
305                 return new LocalShardManager(
306                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
307                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
308             }
309         };
310
311         final TestKit kit = new TestKit(getSystem());
312
313         final ActorRef shardManager = actorFactory.createActor(Props.create(
314                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
315
316         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
317
318         assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
319         assertEquals("getShardElectionTimeoutFactor", 6,
320                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
321         assertEquals("getShardElectionTimeoutFactor", 7,
322                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
323
324         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
325                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
326         doReturn(
327                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
328                 .when(newMockFactory).getShardDatastoreContext("default");
329
330         doReturn(
331                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
332                 .when(newMockFactory).getShardDatastoreContext("topology");
333
334         shardManager.tell(newMockFactory, kit.getRef());
335
336         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
337                 DatastoreContext.class);
338         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
339
340         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
341         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
342
343         LOG.info("testPerShardDatastoreContext ending");
344     }
345
346     @Test
347     public void testOnReceiveFindPrimaryForNonExistentShard() {
348         final TestKit kit = new TestKit(getSystem());
349         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
350
351         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
352
353         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
354
355         kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class);
356     }
357
358     @Test
359     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
360         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
361         final TestKit kit = new TestKit(getSystem());
362         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
363
364         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
365
366         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
367         shardManager.tell(new ActorInitialized(), mockShardActor);
368
369         DataTree mockDataTree = mock(DataTree.class);
370         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
371             DataStoreVersions.CURRENT_VERSION), kit.getRef());
372
373         MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
374         shardManager.tell(
375             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
376             mockShardActor);
377
378         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
379
380         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
381             LocalPrimaryShardFound.class);
382         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
383             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
384         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
385
386         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
387     }
388
389     @Test
390     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
391         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
392         final TestKit kit = new TestKit(getSystem());
393         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
394
395         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
396         shardManager.tell(new ActorInitialized(), mockShardActor);
397
398         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
399         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
400         shardManager.tell(
401             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
402             mockShardActor);
403         shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
404             mockShardActor);
405
406         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
407
408         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
409
410         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
411     }
412
413     @Test
414     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
415         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
416         final TestKit kit = new TestKit(getSystem());
417         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
418
419         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
420         shardManager.tell(new ActorInitialized(), mockShardActor);
421
422         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
423         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
424
425         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
426         shardManager.tell(
427             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
428             mockShardActor);
429         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
430         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
431
432         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
433
434         RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
435             RemotePrimaryShardFound.class);
436         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
437             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
438         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
439
440         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
441     }
442
443     @Test
444     public void testOnReceiveFindPrimaryForUninitializedShard() {
445         final TestKit kit = new TestKit(getSystem());
446         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
447
448         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
449
450         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
451     }
452
453     @Test
454     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
455         final TestKit kit = new TestKit(getSystem());
456         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
457
458         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
459         shardManager.tell(new ActorInitialized(), mockShardActor);
460
461         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
462
463         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
464     }
465
466     @Test
467     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
468         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
469         final TestKit kit = new TestKit(getSystem());
470         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
471
472         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
473         shardManager.tell(new ActorInitialized(), mockShardActor);
474
475         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
476         shardManager.tell(
477             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
478             mockShardActor);
479
480         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
481
482         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
483
484         DataTree mockDataTree = mock(DataTree.class);
485         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
486             DataStoreVersions.CURRENT_VERSION), mockShardActor);
487
488         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
489
490         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
491             LocalPrimaryShardFound.class);
492         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
493             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
494         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
495
496         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
497     }
498
499     @Test
500     public void testOnReceiveFindPrimaryWaitForShardLeader() {
501         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
502         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
503         final TestKit kit = new TestKit(getSystem());
504         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
505
506         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
507
508         // We're passing waitUntilInitialized = true to FindPrimary so
509         // the response should be
510         // delayed until we send ActorInitialized and
511         // RoleChangeNotification.
512         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
513
514         kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
515
516         shardManager.tell(new ActorInitialized(), mockShardActor);
517
518         kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
519
520         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
521         shardManager.tell(
522             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
523             mockShardActor);
524
525         kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
526
527         DataTree mockDataTree = mock(DataTree.class);
528         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
529             DataStoreVersions.CURRENT_VERSION), mockShardActor);
530
531         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
532             LocalPrimaryShardFound.class);
533         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
534             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
535         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
536
537         kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
538
539         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
540     }
541
542     @Test
543     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
544         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
545         final TestKit kit = new TestKit(getSystem());
546         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
547
548         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
549
550         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
551
552         kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class);
553
554         shardManager.tell(new ActorInitialized(), mockShardActor);
555
556         kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
557
558         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
559     }
560
561     @Test
562     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
563         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
564         final TestKit kit = new TestKit(getSystem());
565         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
566
567         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
568         shardManager.tell(new ActorInitialized(), mockShardActor);
569         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
570             RaftState.Candidate.name()), mockShardActor);
571
572         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
573
574         kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
575
576         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
577     }
578
579     @Test
580     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
581         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
582         final TestKit kit = new TestKit(getSystem());
583         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
584
585         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
586         shardManager.tell(new ActorInitialized(), mockShardActor);
587         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
588             RaftState.IsolatedLeader.name()), mockShardActor);
589
590         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
591
592         kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
593
594         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
595     }
596
597     @Test
598     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
599         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
600         final TestKit kit = new TestKit(getSystem());
601         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
602
603         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
604         shardManager.tell(new ActorInitialized(), mockShardActor);
605
606         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
607
608         kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
609
610         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
611     }
612
613     @Test
614     public void testOnReceiveFindPrimaryForRemoteShard() {
615         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
616         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
617
618         // Create an ActorSystem ShardManager actor for member-1.
619
620         final ActorSystem system1 = newActorSystem("Member1");
621         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
622
623         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
624                 newTestShardMgrBuilderWithMockShardActor().cluster(
625                         new ClusterWrapperImpl(system1)).props().withDispatcher(
626                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
627
628         // Create an ActorSystem ShardManager actor for member-2.
629
630         final ActorSystem system2 = newActorSystem("Member2");
631
632         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
633
634         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
635
636         MockConfiguration mockConfig2 = new MockConfiguration(
637                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
638                         .put("astronauts", Arrays.asList("member-2")).build());
639
640         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
641                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
642                         new ClusterWrapperImpl(system2)).props().withDispatcher(
643                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
644
645         final TestKit kit = new TestKit(system1);
646         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
647         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
648
649         shardManager2.tell(new ActorInitialized(), mockShardActor2);
650
651         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
652         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
653         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
654             mockShardActor2);
655         shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
656             mockShardActor2);
657
658         shardManager1.underlyingActor().waitForMemberUp();
659         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
660
661         RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
662         String path = found.getPrimaryPath();
663         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
664         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
665
666         shardManager2.underlyingActor().verifyFindPrimary();
667
668         // This part times out quite a bit on jenkins for some reason
669
670 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
671 //
672 //                shardManager1.underlyingActor().waitForMemberRemoved();
673 //
674 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
675 //
676 //                expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
677
678         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
679     }
680
681     @Test
682     public void testShardAvailabilityOnChangeOfMemberReachability() {
683         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
684         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
685
686         // Create an ActorSystem ShardManager actor for member-1.
687
688         final ActorSystem system1 = newActorSystem("Member1");
689         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
690
691         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
692
693         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
694                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
695                         new ClusterWrapperImpl(system1)).props().withDispatcher(
696                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
697
698         // Create an ActorSystem ShardManager actor for member-2.
699
700         final ActorSystem system2 = newActorSystem("Member2");
701
702         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
703
704         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
705
706         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
707                 .put("default", Arrays.asList("member-1", "member-2")).build());
708
709         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
710                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
711                         new ClusterWrapperImpl(system2)).props().withDispatcher(
712                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
713
714         final TestKit kit = new TestKit(system1);
715         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
716         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
717         shardManager1.tell(new ActorInitialized(), mockShardActor1);
718         shardManager2.tell(new ActorInitialized(), mockShardActor2);
719
720         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
721         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
722         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
723             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
724         shardManager1.tell(
725             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
726             mockShardActor1);
727         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
728             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
729         shardManager2.tell(
730             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
731             mockShardActor2);
732         shardManager1.underlyingActor().waitForMemberUp();
733
734         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
735
736         RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
737         String path = found.getPrimaryPath();
738         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
739
740         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
741             kit.getRef());
742
743         shardManager1.underlyingActor().waitForUnreachableMember();
744
745         PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
746         assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
747         MessageCollectorActor.clearMessages(mockShardActor1);
748
749         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
750             kit.getRef());
751
752         MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
753
754         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
755
756         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
757
758         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
759             kit.getRef());
760
761         shardManager1.underlyingActor().waitForReachableMember();
762
763         PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
764         assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
765         MessageCollectorActor.clearMessages(mockShardActor1);
766
767         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
768
769         RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
770         String path1 = found1.getPrimaryPath();
771         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
772
773         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
774             kit.getRef());
775
776         MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
777
778         // Test FindPrimary wait succeeds after reachable member event.
779
780         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
781                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
782         shardManager1.underlyingActor().waitForUnreachableMember();
783
784         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
785
786         shardManager1.tell(
787             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
788
789         RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
790         String path2 = found2.getPrimaryPath();
791         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
792
793         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
794     }
795
796     @Test
797     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
798         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
799         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
800
801         // Create an ActorSystem ShardManager actor for member-1.
802
803         final ActorSystem system1 = newActorSystem("Member1");
804         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
805
806         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
807
808         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
809         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
810                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
811                         .primaryShardInfoCache(primaryShardInfoCache).props()
812                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
813                 shardManagerID);
814
815         // Create an ActorSystem ShardManager actor for member-2.
816
817         final ActorSystem system2 = newActorSystem("Member2");
818
819         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
820
821         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
822
823         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
824                 .put("default", Arrays.asList("member-1", "member-2")).build());
825
826         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
827                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
828                         new ClusterWrapperImpl(system2)).props().withDispatcher(
829                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
830
831         final TestKit kit = new TestKit(system1);
832         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
833         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
834         shardManager1.tell(new ActorInitialized(), mockShardActor1);
835         shardManager2.tell(new ActorInitialized(), mockShardActor2);
836
837         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
838         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
839         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
840             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
841         shardManager1.tell(
842             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
843             mockShardActor1);
844         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
845             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
846         shardManager2.tell(
847             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
848             mockShardActor2);
849         shardManager1.underlyingActor().waitForMemberUp();
850
851         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
852
853         RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
854         String path = found.getPrimaryPath();
855         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
856
857         primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
858             system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
859
860         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
861                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
862
863         shardManager1.underlyingActor().waitForUnreachableMember();
864
865         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
866
867         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
868
869         assertNull("Expected primaryShardInfoCache entry removed",
870             primaryShardInfoCache.getIfPresent("default"));
871
872         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
873             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
874         shardManager1.tell(
875             new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
876             mockShardActor1);
877
878         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
879
880         LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class);
881         String path1 = found1.getPrimaryPath();
882         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
883
884         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
885     }
886
887     @Test
888     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
889         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
890         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
891
892         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
893                 .put("default", Arrays.asList("member-256", "member-2")).build());
894
895         // Create an ActorSystem, ShardManager and actor for member-256.
896
897         final ActorSystem system256 = newActorSystem("Member256");
898         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
899         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
900
901         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
902
903         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
904
905         // ShardManager must be created with shard configuration to let its localShards has shards.
906         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
907                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
908                         .cluster(new ClusterWrapperImpl(system256))
909                         .primaryShardInfoCache(primaryShardInfoCache).props()
910                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
911                 shardManagerID);
912
913         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
914
915         final ActorSystem system2 = newActorSystem("Member2");
916
917         // Join member-2 into the cluster of member-256.
918         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
919
920         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
921
922         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
923                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
924                         new ClusterWrapperImpl(system2)).props().withDispatcher(
925                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
926
927         final TestKit kit256 = new TestKit(system256);
928         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
929         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
930         shardManager256.tell(new ActorInitialized(), mockShardActor256);
931         shardManager2.tell(new ActorInitialized(), mockShardActor2);
932
933         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
934         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
935         shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
936             DataStoreVersions.CURRENT_VERSION), mockShardActor256);
937         shardManager256.tell(
938             new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
939             mockShardActor256);
940         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
941             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
942         shardManager2.tell(
943             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
944             mockShardActor2);
945         shardManager256.underlyingActor().waitForMemberUp();
946
947         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
948
949         LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"),
950             LocalPrimaryShardFound.class);
951         String path = found.getPrimaryPath();
952         assertTrue("Unexpected primary path " + path + " which must on member-256",
953             path.contains("member-256-shard-default-config"));
954
955         PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
956             system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
957         primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
958
959         // Simulate member-2 become unreachable.
960         shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
961                 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
962         shardManager256.underlyingActor().waitForUnreachableMember();
963
964         // Make sure leader shard on member-256 is still leader and still in the cache.
965         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
966         found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class);
967         path = found.getPrimaryPath();
968         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
969             path.contains("member-256-shard-default-config"));
970         Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
971         futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
972             @Override
973             public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
974                 if (failure != null) {
975                     assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
976                 } else {
977                     assertEquals("Expected primaryShardInfoCache entry",
978                         primaryShardInfo, futurePrimaryShardInfo);
979                 }
980             }
981         }, system256.dispatchers().defaultGlobalDispatcher());
982
983         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
984     }
985
986     @Test
987     public void testOnReceiveFindLocalShardForNonExistentShard() {
988         final TestKit kit = new TestKit(getSystem());
989         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
990
991         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
992
993         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
994
995         LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
996
997         assertEquals("getShardName", "non-existent", notFound.getShardName());
998     }
999
1000     @Test
1001     public void testOnReceiveFindLocalShardForExistentShard() {
1002         final TestKit kit = new TestKit(getSystem());
1003         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1004
1005         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1006         shardManager.tell(new ActorInitialized(), mockShardActor);
1007
1008         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1009
1010         LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1011
1012         assertTrue("Found path contains " + found.getPath().path().toString(),
1013             found.getPath().path().toString().contains("member-1-shard-default-config"));
1014     }
1015
1016     @Test
1017     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1018         final TestKit kit = new TestKit(getSystem());
1019         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1020
1021         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1022
1023         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1024     }
1025
1026     @Test
1027     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1028         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1029         final TestKit kit = new TestKit(getSystem());
1030         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1031
1032         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1033
1034         // We're passing waitUntilInitialized = true to FindLocalShard
1035         // so the response should be
1036         // delayed until we send ActorInitialized.
1037         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1038             new Timeout(5, TimeUnit.SECONDS));
1039
1040         shardManager.tell(new ActorInitialized(), mockShardActor);
1041
1042         Object resp = Await.result(future, kit.duration("5 seconds"));
1043         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1044
1045         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1046     }
1047
1048     @Test
1049     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1050         TestShardManager shardManager = newTestShardManager();
1051
1052         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1053         shardManager.onReceiveCommand(new RoleChangeNotification(
1054                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1055
1056         verify(ready, never()).countDown();
1057
1058         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1059                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1060
1061         verify(ready, times(1)).countDown();
1062     }
1063
1064     @Test
1065     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1066         final TestKit kit = new TestKit(getSystem());
1067         TestShardManager shardManager = newTestShardManager();
1068
1069         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1070         shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1071
1072         verify(ready, never()).countDown();
1073
1074         shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1075
1076         shardManager.onReceiveCommand(
1077             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1078                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1079
1080         verify(ready, times(1)).countDown();
1081     }
1082
1083     @Test
1084     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1085         final TestKit kit = new TestKit(getSystem());
1086         TestShardManager shardManager = newTestShardManager();
1087
1088         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1089         shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1090
1091         verify(ready, never()).countDown();
1092
1093         shardManager.onReceiveCommand(
1094             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1095                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1096
1097         shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1098
1099         verify(ready, times(1)).countDown();
1100     }
1101
1102     @Test
1103     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1104         TestShardManager shardManager = newTestShardManager();
1105
1106         shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1107             RaftState.Leader.name()));
1108
1109         verify(ready, never()).countDown();
1110     }
1111
1112     @Test
1113     public void testByDefaultSyncStatusIsFalse() {
1114         TestShardManager shardManager = newTestShardManager();
1115
1116         assertFalse(shardManager.getMBean().getSyncStatus());
1117     }
1118
1119     @Test
1120     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1121         TestShardManager shardManager = newTestShardManager();
1122
1123         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1124                 RaftState.Follower.name(), RaftState.Leader.name()));
1125
1126         assertTrue(shardManager.getMBean().getSyncStatus());
1127     }
1128
1129     @Test
1130     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1131         TestShardManager shardManager = newTestShardManager();
1132
1133         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1134         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1135                 RaftState.Follower.name(), RaftState.Candidate.name()));
1136
1137         assertFalse(shardManager.getMBean().getSyncStatus());
1138
1139         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1140         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1141                 true, shardId));
1142
1143         assertFalse(shardManager.getMBean().getSyncStatus());
1144     }
1145
1146     @Test
1147     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1148         TestShardManager shardManager = newTestShardManager();
1149
1150         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1151         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1152                 RaftState.Candidate.name(), RaftState.Follower.name()));
1153
1154         // Initially will be false
1155         assertFalse(shardManager.getMBean().getSyncStatus());
1156
1157         // Send status true will make sync status true
1158         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1159
1160         assertTrue(shardManager.getMBean().getSyncStatus());
1161
1162         // Send status false will make sync status false
1163         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1164
1165         assertFalse(shardManager.getMBean().getSyncStatus());
1166     }
1167
1168     @Test
1169     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1170         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1171         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1172             @Override
1173             public List<String> getMemberShardNames(final MemberName memberName) {
1174                 return Arrays.asList("default", "astronauts");
1175             }
1176         }));
1177
1178         // Initially will be false
1179         assertFalse(shardManager.getMBean().getSyncStatus());
1180
1181         // Make default shard leader
1182         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1183         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1184                 RaftState.Follower.name(), RaftState.Leader.name()));
1185
1186         // default = Leader, astronauts is unknown so sync status remains false
1187         assertFalse(shardManager.getMBean().getSyncStatus());
1188
1189         // Make astronauts shard leader as well
1190         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1191         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1192                 RaftState.Follower.name(), RaftState.Leader.name()));
1193
1194         // Now sync status should be true
1195         assertTrue(shardManager.getMBean().getSyncStatus());
1196
1197         // Make astronauts a Follower
1198         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1199                 RaftState.Leader.name(), RaftState.Follower.name()));
1200
1201         // Sync status is not true
1202         assertFalse(shardManager.getMBean().getSyncStatus());
1203
1204         // Make the astronauts follower sync status true
1205         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1206
1207         // Sync status is now true
1208         assertTrue(shardManager.getMBean().getSyncStatus());
1209
1210         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1211     }
1212
1213     @Test
1214     public void testOnReceiveSwitchShardBehavior() {
1215         final TestKit kit = new TestKit(getSystem());
1216         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1217
1218         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1219         shardManager.tell(new ActorInitialized(), mockShardActor);
1220
1221         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1222
1223         SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1224             SwitchBehavior.class);
1225
1226         assertEquals(RaftState.Leader, switchBehavior.getNewState());
1227         assertEquals(1000, switchBehavior.getNewTerm());
1228     }
1229
1230     private static List<MemberName> members(final String... names) {
1231         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1232     }
1233
1234     @Test
1235     public void testOnCreateShard() {
1236         LOG.info("testOnCreateShard starting");
1237         final TestKit kit = new TestKit(getSystem());
1238         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1239
1240         ActorRef shardManager = actorFactory
1241                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1242                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1243
1244         SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1245         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1246
1247         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1248                 .persistent(false).build();
1249         Shard.Builder shardBuilder = Shard.builder();
1250
1251         ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1252             "foo", null, members("member-1", "member-5", "member-6"));
1253         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1254
1255         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1256
1257         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1258
1259         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1260
1261         assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1262         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1263             .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1264         assertEquals("peerMembers", Sets.newHashSet(
1265             ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1266             ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1267             shardBuilder.getPeerAddresses().keySet());
1268         assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1269             shardBuilder.getId());
1270         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1271
1272         // Send CreateShard with same name - should return Success with
1273         // a message.
1274
1275         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1276
1277         Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1278         assertNotNull("Success status is null", success.status());
1279
1280         LOG.info("testOnCreateShard ending");
1281     }
1282
1283     @Test
1284     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1285         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1286         final TestKit kit = new TestKit(getSystem());
1287         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1288
1289         ActorRef shardManager = actorFactory
1290                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1291                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1292
1293         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1294
1295         Shard.Builder shardBuilder = Shard.builder();
1296         ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1297             "foo", null, members("member-5", "member-6"));
1298
1299         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1300         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1301
1302         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1303         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1304
1305         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1306         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1307             .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1308
1309         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1310     }
1311
1312     @Test
1313     public void testOnCreateShardWithNoInitialSchemaContext() {
1314         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1315         final TestKit kit = new TestKit(getSystem());
1316         ActorRef shardManager = actorFactory
1317                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1318                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1319
1320         Shard.Builder shardBuilder = Shard.builder();
1321
1322         ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1323             "foo", null, members("member-1"));
1324         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1325
1326         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1327
1328         SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1329         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1330
1331         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1332
1333         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1334
1335         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1336         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1337
1338         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1339     }
1340
1341     @Test
1342     public void testGetSnapshot() {
1343         LOG.info("testGetSnapshot starting");
1344         TestKit kit = new TestKit(getSystem());
1345
1346         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1347                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1348                 .put("astronauts", Collections.<String>emptyList()).build());
1349
1350         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1351                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1352
1353         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1354         Failure failure = kit.expectMsgClass(Failure.class);
1355         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1356
1357         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1358
1359         waitForShardInitialized(shardManager, "shard1", kit);
1360         waitForShardInitialized(shardManager, "shard2", kit);
1361
1362         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1363
1364         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1365
1366         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1367         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1368
1369         Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
1370
1371         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1372             datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet())));
1373
1374         // Add a new replica
1375
1376         TestKit mockShardLeaderKit = new TestKit(getSystem());
1377
1378         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1379         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1380
1381         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1382         mockShardLeaderKit.expectMsgClass(AddServer.class);
1383         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1384         kit.expectMsgClass(Status.Success.class);
1385         waitForShardInitialized(shardManager, "astronauts", kit);
1386
1387         // Send another GetSnapshot and verify
1388
1389         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1390         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1391
1392         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1393                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1394
1395         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1396         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1397         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1398                 Sets.newHashSet(snapshot.getShardList()));
1399
1400         LOG.info("testGetSnapshot ending");
1401     }
1402
1403     @Test
1404     public void testRestoreFromSnapshot() {
1405         LOG.info("testRestoreFromSnapshot starting");
1406
1407         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1408
1409         TestKit kit = new TestKit(getSystem());
1410
1411         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1412                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1413                 .put("astronauts", Collections.<String>emptyList()).build());
1414
1415         ShardManagerSnapshot snapshot =
1416                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1417         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1418                 Collections.<ShardSnapshot>emptyList());
1419         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1420                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1421
1422         shardManager.underlyingActor().waitForRecoveryComplete();
1423
1424         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1425
1426         waitForShardInitialized(shardManager, "shard1", kit);
1427         waitForShardInitialized(shardManager, "shard2", kit);
1428         waitForShardInitialized(shardManager, "astronauts", kit);
1429
1430         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1431
1432         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1433
1434         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1435
1436         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1437         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1438                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1439
1440         LOG.info("testRestoreFromSnapshot ending");
1441     }
1442
1443     @Test
1444     public void testAddShardReplicaForNonExistentShardConfig() {
1445         final TestKit kit = new TestKit(getSystem());
1446         ActorRef shardManager = actorFactory
1447                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1448                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1449
1450         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1451         Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class);
1452
1453         assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1454     }
1455
1456     @Test
1457     public void testAddShardReplica() {
1458         LOG.info("testAddShardReplica starting");
1459         MockConfiguration mockConfig = new MockConfiguration(
1460                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1461                         .put("astronauts", Arrays.asList("member-2")).build());
1462
1463         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1464         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1465
1466         // Create an ActorSystem ShardManager actor for member-1.
1467         final ActorSystem system1 = newActorSystem("Member1");
1468         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1469         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1470         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1471                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1472                         .cluster(new ClusterWrapperImpl(system1)).props()
1473                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1474                 shardManagerID);
1475
1476         // Create an ActorSystem ShardManager actor for member-2.
1477         final ActorSystem system2 = newActorSystem("Member2");
1478         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1479
1480         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1481         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1482         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1483                 Props.create(MockRespondActor.class, AddServer.class,
1484                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1485                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1486                 name);
1487         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1488                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1489                         .cluster(new ClusterWrapperImpl(system2)).props()
1490                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1491                 shardManagerID);
1492
1493         final TestKit kit = new TestKit(getSystem());
1494         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1495         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1496
1497         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1498
1499         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1500         leaderShardManager.tell(
1501             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1502             mockShardLeaderActor);
1503         leaderShardManager.tell(
1504             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1505             mockShardLeaderActor);
1506
1507         newReplicaShardManager.underlyingActor().waitForMemberUp();
1508         leaderShardManager.underlyingActor().waitForMemberUp();
1509
1510         // Have a dummy snapshot to be overwritten by the new data
1511         // persisted.
1512         String[] restoredShards = { "default", "people" };
1513         ShardManagerSnapshot snapshot =
1514                 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1515         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1516         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1517
1518         InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1519         InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1520
1521         // construct a mock response message
1522         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1523         AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1524             AddServer.class);
1525         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1526         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1527         kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
1528
1529         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1530         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1531         List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1532             ShardManagerSnapshot.class);
1533         assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1534         ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1535         assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1536             Sets.newHashSet(shardManagerSnapshot.getShardList()));
1537         LOG.info("testAddShardReplica ending");
1538     }
1539
1540     @Test
1541     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1542         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1543         final TestKit kit = new TestKit(getSystem());
1544         TestActorRef<TestShardManager> shardManager = actorFactory
1545                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1546
1547         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1548         shardManager.tell(new ActorInitialized(), mockShardActor);
1549
1550         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1551         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1552         ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1553                 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1554
1555         MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1556
1557         String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1558         shardManager.tell(
1559             new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1560             mockShardActor);
1561         shardManager.tell(
1562             new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1563             mockShardActor);
1564
1565         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1566
1567         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1568
1569         Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1570         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1571
1572         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1573         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1574
1575         // Send message again to verify previous in progress state is
1576         // cleared
1577
1578         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1579         resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1580         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1581
1582         // Send message again with an AddServer timeout to verify the
1583         // pre-existing shard actor isn't terminated.
1584
1585         shardManager.tell(
1586             newDatastoreContextFactory(
1587                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1588         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1589         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1590         kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1591
1592         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1593         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1594
1595         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1596     }
1597
1598     @Test
1599     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1600         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1601         final TestKit kit = new TestKit(getSystem());
1602         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1603         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1604
1605         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1606         shardManager.tell(new ActorInitialized(), mockShardActor);
1607         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1608             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1609         shardManager.tell(
1610             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1611             mockShardActor);
1612
1613         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1614         Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1615         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1616
1617         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1618         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1619
1620         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1621     }
1622
1623     @Test
1624     public void testAddShardReplicaWithAddServerReplyFailure() {
1625         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1626         final TestKit kit = new TestKit(getSystem());
1627         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1628
1629         MockConfiguration mockConfig = new MockConfiguration(
1630             ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1631
1632         ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1633         final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1634             newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1635             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1636         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1637
1638         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1639
1640         TestKit terminateWatcher = new TestKit(getSystem());
1641         terminateWatcher.watch(mockNewReplicaShardActor);
1642
1643         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1644
1645         AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1646         assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1647             addServerMsg.getNewServerId());
1648         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1649
1650         Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1651         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1652
1653         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1654         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
1655
1656         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1657
1658         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1659         mockShardLeaderKit.expectMsgClass(AddServer.class);
1660         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1661         failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1662         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1663
1664         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1665     }
1666
1667     @Test
1668     public void testAddShardReplicaWithAlreadyInProgress() {
1669         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1670                 AddServer.class, new AddShardReplica("astronauts"));
1671     }
1672
1673     @Test
1674     public void testAddShardReplicaWithFindPrimaryTimeout() {
1675         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1676         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1677         final TestKit kit = new TestKit(getSystem());
1678         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1679
1680         final ActorRef newReplicaShardManager = actorFactory
1681                 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1682                     .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1683
1684         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1685         MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1686             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1687
1688         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1689         Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
1690         assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1691
1692         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1693     }
1694
1695     @Test
1696     public void testRemoveShardReplicaForNonExistentShard() {
1697         final TestKit kit = new TestKit(getSystem());
1698         ActorRef shardManager = actorFactory
1699                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1700                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1701
1702         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1703         Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class);
1704         assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1705     }
1706
1707     @Test
1708     /**
1709      * Primary is Local.
1710      */
1711     public void testRemoveShardReplicaLocal() {
1712         final TestKit kit = new TestKit(getSystem());
1713         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1714
1715         final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1716             RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1717
1718         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1719
1720         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1721         shardManager.tell(new ActorInitialized(), respondActor);
1722         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1723             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1724         shardManager.tell(
1725             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1726             respondActor);
1727
1728         shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1729         final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1730             RemoveServer.class);
1731         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1732             removeServer.getServerId());
1733         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1734     }
1735
1736     @Test
1737     public void testRemoveShardReplicaRemote() {
1738         MockConfiguration mockConfig = new MockConfiguration(
1739                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1740                         .put("astronauts", Arrays.asList("member-1")).build());
1741
1742         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1743
1744         // Create an ActorSystem ShardManager actor for member-1.
1745         final ActorSystem system1 = newActorSystem("Member1");
1746         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1747         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1748
1749         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1750                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1751                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1752                 shardManagerID);
1753
1754         // Create an ActorSystem ShardManager actor for member-2.
1755         final ActorSystem system2 = newActorSystem("Member2");
1756         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1757
1758         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1759         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1760         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1761                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1762                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1763
1764         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1765
1766         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1767                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1768                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1769                 shardManagerID);
1770
1771         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1772         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1773         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1774         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1775         // look like so,
1776         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1777         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1778         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1779         // dead letters.
1780         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1781         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1782         // thing works as expected
1783         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1784                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1785                         "member-2-shard-default-" + shardMrgIDSuffix);
1786
1787         LOG.error("Forwarding actor : {}", actorRef);
1788
1789         final TestKit kit = new TestKit(getSystem());
1790         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1791         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1792
1793         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1794         newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1795
1796         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1797         leaderShardManager.tell(
1798             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1799             mockShardLeaderActor);
1800         leaderShardManager.tell(
1801             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1802             mockShardLeaderActor);
1803
1804         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1805         newReplicaShardManager.tell(
1806             new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1807             mockShardActor);
1808         newReplicaShardManager.tell(
1809             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1810             mockShardActor);
1811
1812         newReplicaShardManager.underlyingActor().waitForMemberUp();
1813         leaderShardManager.underlyingActor().waitForMemberUp();
1814
1815         // construct a mock response message
1816         newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1817         RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1818             RemoveServer.class);
1819         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1820         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1821         kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
1822     }
1823
1824     @Test
1825     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1826         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1827                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1828     }
1829
1830     @Test
1831     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1832         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1833                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1834     }
1835
1836
1837     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1838                                                       final Class<?> firstForwardedServerChangeClass,
1839                                                       final Object secondServerChange) {
1840         final TestKit kit = new TestKit(getSystem());
1841         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1842         final TestKit secondRequestKit = new TestKit(getSystem());
1843
1844         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1845             .put(shardName, Arrays.asList("member-2")).build());
1846
1847         final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1848             newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1849             .cluster(new MockClusterWrapper()).props()
1850             .withDispatcher(Dispatchers.DefaultDispatcherId()),
1851             shardMgrID);
1852
1853         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1854
1855         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1856
1857         shardManager.tell(firstServerChange, kit.getRef());
1858
1859         mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1860
1861         shardManager.tell(secondServerChange, secondRequestKit.getRef());
1862
1863         secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class);
1864     }
1865
1866     @Test
1867     public void testServerRemovedShardActorNotRunning() {
1868         LOG.info("testServerRemovedShardActorNotRunning starting");
1869         final TestKit kit = new TestKit(getSystem());
1870         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1871             .put("default", Arrays.asList("member-1", "member-2"))
1872             .put("astronauts", Arrays.asList("member-2"))
1873             .put("people", Arrays.asList("member-1", "member-2")).build());
1874
1875         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1876             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1877
1878         shardManager.underlyingActor().waitForRecoveryComplete();
1879         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1880         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1881
1882         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1883         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1884
1885         // Removed the default shard replica from member-1
1886         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1887         ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1888                 .build();
1889         shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1890
1891         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1892
1893         LOG.info("testServerRemovedShardActorNotRunning ending");
1894     }
1895
1896     @Test
1897     public void testServerRemovedShardActorRunning() {
1898         LOG.info("testServerRemovedShardActorRunning starting");
1899         final TestKit kit = new TestKit(getSystem());
1900         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1901             .put("default", Arrays.asList("member-1", "member-2"))
1902             .put("astronauts", Arrays.asList("member-2"))
1903             .put("people", Arrays.asList("member-1", "member-2")).build());
1904
1905         String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1906         ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1907
1908         TestActorRef<TestShardManager> shardManager = actorFactory
1909                 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1910                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1911
1912         shardManager.underlyingActor().waitForRecoveryComplete();
1913
1914         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1915         shardManager.tell(new ActorInitialized(), shard);
1916
1917         waitForShardInitialized(shardManager, "people", kit);
1918         waitForShardInitialized(shardManager, "default", kit);
1919
1920         // Removed the default shard replica from member-1
1921         shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1922
1923         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1924
1925         MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1926
1927         LOG.info("testServerRemovedShardActorRunning ending");
1928     }
1929
1930     @Test
1931     public void testShardPersistenceWithRestoredData() {
1932         LOG.info("testShardPersistenceWithRestoredData starting");
1933         final TestKit kit = new TestKit(getSystem());
1934         MockConfiguration mockConfig =
1935                 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1936                     .put("default", Arrays.asList("member-1", "member-2"))
1937                     .put("astronauts", Arrays.asList("member-2"))
1938                     .put("people", Arrays.asList("member-1", "member-2")).build());
1939         String[] restoredShards = {"default", "astronauts"};
1940         ShardManagerSnapshot snapshot =
1941                 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1942         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1943
1944         // create shardManager to come up with restored data
1945         TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1946             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1947
1948         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1949
1950         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1951         LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
1952         assertEquals("for uninitialized shard", "people", notFound.getShardName());
1953
1954         // Verify a local shard is created for the restored shards,
1955         // although we expect a NotInitializedException for the shards
1956         // as the actor initialization
1957         // message is not sent for them
1958         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1959         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1960
1961         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1962         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1963
1964         LOG.info("testShardPersistenceWithRestoredData ending");
1965     }
1966
1967     @Test
1968     public void testShutDown() throws Exception {
1969         LOG.info("testShutDown starting");
1970         final TestKit kit = new TestKit(getSystem());
1971         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1972             .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1973
1974         String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1975         ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1976
1977         String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1978         ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1979
1980         ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1981             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1982
1983         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1984         shardManager.tell(new ActorInitialized(), shard1);
1985         shardManager.tell(new ActorInitialized(), shard2);
1986
1987         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1988         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1989
1990         MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1991         MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1992
1993         try {
1994             Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1995             fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1996         } catch (TimeoutException e) {
1997             // expected
1998         }
1999
2000         actorFactory.killActor(shard1, kit);
2001         actorFactory.killActor(shard2, kit);
2002
2003         Boolean stopped = Await.result(stopFuture, duration);
2004         assertEquals("Stopped", Boolean.TRUE, stopped);
2005
2006         LOG.info("testShutDown ending");
2007     }
2008
2009     @Test
2010     public void testChangeServersVotingStatus() {
2011         final TestKit kit = new TestKit(getSystem());
2012         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2013
2014         ActorRef respondActor = actorFactory
2015                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2016                     new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2017
2018         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2019
2020         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2021         shardManager.tell(new ActorInitialized(), respondActor);
2022         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2023             DataStoreVersions.CURRENT_VERSION), kit.getRef());
2024         shardManager.tell(
2025             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2026             respondActor);
2027
2028         shardManager.tell(
2029             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2030
2031         ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2032                 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2033         assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2034             ImmutableMap.of(ShardIdentifier
2035                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2036                 Boolean.TRUE));
2037
2038         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
2039     }
2040
2041     @Test
2042     public void testChangeServersVotingStatusWithNoLeader() {
2043         final TestKit kit = new TestKit(getSystem());
2044         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2045
2046         ActorRef respondActor = actorFactory
2047                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2048                     new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2049
2050         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2051
2052         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2053         shardManager.tell(new ActorInitialized(), respondActor);
2054         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2055
2056         shardManager.tell(
2057             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2058
2059         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2060
2061         Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
2062         assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2063     }
2064
2065     @SuppressWarnings("unchecked")
2066     @Test
2067     public void testRegisterForShardLeaderChanges() {
2068         LOG.info("testRegisterForShardLeaderChanges starting");
2069
2070         final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2071         final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2072         final TestKit kit = new TestKit(getSystem());
2073         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2074
2075         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2076         shardManager.tell(new ActorInitialized(), mockShardActor);
2077
2078         final Consumer<String> mockCallback = mock(Consumer.class);
2079         shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2080
2081         final Success reply = kit.expectMsgClass(Duration.apply(5, TimeUnit.SECONDS), Success.class);
2082         final Registration reg = (Registration) reply.status();
2083
2084         final DataTree mockDataTree = mock(DataTree.class);
2085         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2086             DataStoreVersions.CURRENT_VERSION), mockShardActor);
2087
2088         verify(mockCallback, timeout(5000)).accept("default");
2089
2090         reset(mockCallback);
2091         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2092                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2093
2094         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2095         verifyNoMoreInteractions(mockCallback);
2096
2097         shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2098                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2099
2100         verify(mockCallback, timeout(5000)).accept("default");
2101
2102         reset(mockCallback);
2103         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2104                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2105
2106         verify(mockCallback, timeout(5000)).accept("default");
2107
2108         reset(mockCallback);
2109         reg.close();
2110
2111         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2112                 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2113
2114         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2115         verifyNoMoreInteractions(mockCallback);
2116
2117         LOG.info("testRegisterForShardLeaderChanges ending");
2118     }
2119
2120     public static class TestShardManager extends ShardManager {
2121         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2122         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2123         private ShardManagerSnapshot snapshot;
2124         private final Map<String, ActorRef> shardActors;
2125         private final ActorRef shardActor;
2126         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2127         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2128         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2129         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2130         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2131         private volatile MessageInterceptor messageInterceptor;
2132
2133         TestShardManager(final Builder builder) {
2134             super(builder);
2135             shardActor = builder.shardActor;
2136             shardActors = builder.shardActors;
2137         }
2138
2139         @Override
2140         protected void handleRecover(final Object message) throws Exception {
2141             try {
2142                 super.handleRecover(message);
2143             } finally {
2144                 if (message instanceof RecoveryCompleted) {
2145                     recoveryComplete.countDown();
2146                 }
2147             }
2148         }
2149
2150         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2151             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2152                 latch.countDown();
2153             }
2154         }
2155
2156         @Override
2157         public void handleCommand(final Object message) throws Exception {
2158             try {
2159                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2160                     getSender().tell(messageInterceptor.apply(message), getSelf());
2161                 } else {
2162                     super.handleCommand(message);
2163                 }
2164             } finally {
2165                 if (message instanceof FindPrimary) {
2166                     findPrimaryMessageReceived.countDown();
2167                 } else if (message instanceof ClusterEvent.MemberUp) {
2168                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2169                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2170                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2171                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2172                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2173                 } else if (message instanceof ClusterEvent.ReachableMember) {
2174                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2175                 }
2176             }
2177         }
2178
2179         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2180             this.messageInterceptor = messageInterceptor;
2181         }
2182
2183         void waitForRecoveryComplete() {
2184             assertTrue("Recovery complete",
2185                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2186         }
2187
2188         public void waitForMemberUp() {
2189             assertTrue("MemberUp received",
2190                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2191             memberUpReceived = new CountDownLatch(1);
2192         }
2193
2194         void waitForMemberRemoved() {
2195             assertTrue("MemberRemoved received",
2196                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2197             memberRemovedReceived = new CountDownLatch(1);
2198         }
2199
2200         void waitForUnreachableMember() {
2201             assertTrue("UnreachableMember received",
2202                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2203             memberUnreachableReceived = new CountDownLatch(1);
2204         }
2205
2206         void waitForReachableMember() {
2207             assertTrue("ReachableMember received",
2208                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2209             memberReachableReceived = new CountDownLatch(1);
2210         }
2211
2212         void verifyFindPrimary() {
2213             assertTrue("FindPrimary received",
2214                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2215             findPrimaryMessageReceived = new CountDownLatch(1);
2216         }
2217
2218         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2219             return new Builder(datastoreContextBuilder);
2220         }
2221
2222         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2223             private ActorRef shardActor;
2224             private final Map<String, ActorRef> shardActors = new HashMap<>();
2225
2226             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2227                 super(TestShardManager.class);
2228                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2229             }
2230
2231             Builder shardActor(final ActorRef newShardActor) {
2232                 this.shardActor = newShardActor;
2233                 return this;
2234             }
2235
2236             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2237                 shardActors.put(shardName, actorRef);
2238                 return this;
2239             }
2240         }
2241
2242         @Override
2243         public void saveSnapshot(final Object obj) {
2244             snapshot = (ShardManagerSnapshot) obj;
2245             snapshotPersist.countDown();
2246             super.saveSnapshot(obj);
2247         }
2248
2249         void verifySnapshotPersisted(final Set<String> shardList) {
2250             assertTrue("saveSnapshot invoked",
2251                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2252             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2253         }
2254
2255         @Override
2256         protected ActorRef newShardActor(final ShardInformation info) {
2257             if (shardActors.get(info.getShardName()) != null) {
2258                 return shardActors.get(info.getShardName());
2259             }
2260
2261             if (shardActor != null) {
2262                 return shardActor;
2263             }
2264
2265             return super.newShardActor(info);
2266         }
2267     }
2268
2269     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2270                                                      extends AbstractShardManagerCreator<T> {
2271         private final Class<C> shardManagerClass;
2272
2273         AbstractGenericCreator(final Class<C> shardManagerClass) {
2274             this.shardManagerClass = shardManagerClass;
2275             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2276                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2277         }
2278
2279         @Override
2280         public Props props() {
2281             verify();
2282             return Props.create(shardManagerClass, this);
2283         }
2284     }
2285
2286     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2287         GenericCreator(final Class<C> shardManagerClass) {
2288             super(shardManagerClass);
2289         }
2290     }
2291
2292     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2293         private static final long serialVersionUID = 1L;
2294         private final Creator<ShardManager> delegate;
2295
2296         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2297             this.delegate = delegate;
2298         }
2299
2300         @Override
2301         public ShardManager create() throws Exception {
2302             return delegate.create();
2303         }
2304     }
2305
2306     interface MessageInterceptor extends Function<Object, Object> {
2307         boolean canIntercept(Object message);
2308     }
2309
2310     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2311         return new MessageInterceptor() {
2312             @Override
2313             public Object apply(final Object message) {
2314                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2315             }
2316
2317             @Override
2318             public boolean canIntercept(final Object message) {
2319                 return message instanceof FindPrimary;
2320             }
2321         };
2322     }
2323
2324     private static class MockRespondActor extends MessageCollectorActor {
2325         static final String CLEAR_RESPONSE = "clear-response";
2326
2327         private Object responseMsg;
2328         private final Class<?> requestClass;
2329
2330         @SuppressWarnings("unused")
2331         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2332             this.requestClass = requestClass;
2333             this.responseMsg = responseMsg;
2334         }
2335
2336         @Override
2337         public void onReceive(final Object message) throws Exception {
2338             if (message.equals(CLEAR_RESPONSE)) {
2339                 responseMsg = null;
2340             } else {
2341                 super.onReceive(message);
2342                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2343                     getSender().tell(responseMsg, getSelf());
2344                 }
2345             }
2346         }
2347     }
2348 }