7f415d7dc78e42a147b89522748969aa53bb1907
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.actor.AddressFromURIString;
25 import akka.actor.Props;
26 import akka.actor.Status;
27 import akka.actor.Status.Failure;
28 import akka.actor.Status.Success;
29 import akka.cluster.Cluster;
30 import akka.cluster.ClusterEvent;
31 import akka.cluster.Member;
32 import akka.dispatch.Dispatchers;
33 import akka.japi.Creator;
34 import akka.pattern.Patterns;
35 import akka.persistence.RecoveryCompleted;
36 import akka.serialization.Serialization;
37 import akka.testkit.JavaTestKit;
38 import akka.testkit.TestActorRef;
39 import akka.util.Timeout;
40 import com.google.common.base.Function;
41 import com.google.common.base.Stopwatch;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.ImmutableSet;
44 import com.google.common.collect.Lists;
45 import com.google.common.collect.Sets;
46 import com.google.common.util.concurrent.Uninterruptibles;
47 import java.net.URI;
48 import java.util.AbstractMap;
49 import java.util.Arrays;
50 import java.util.Collection;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Map.Entry;
56 import java.util.Set;
57 import java.util.concurrent.CountDownLatch;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.TimeoutException;
60 import java.util.stream.Collectors;
61 import org.junit.Test;
62 import org.mockito.Mockito;
63 import org.opendaylight.controller.cluster.access.concepts.MemberName;
64 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
65 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
66 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
67 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
68 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
69 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
70 import org.opendaylight.controller.cluster.datastore.Shard;
71 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
72 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
73 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
74 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
75 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
76 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
77 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
78 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
79 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
80 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
81 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
82 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
83 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
84 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
85 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
86 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
87 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
88 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
89 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
90 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
91 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
92 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
93 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
94 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
95 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
96 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
97 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
98 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
99 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
100 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
101 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
102 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
103 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
104 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
105 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
106 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
107 import org.opendaylight.controller.cluster.raft.RaftState;
108 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
109 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
110 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
111 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
112 import org.opendaylight.controller.cluster.raft.messages.AddServer;
113 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
114 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
115 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
116 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
117 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
118 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
119 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
120 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
121 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
122 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
123 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
124 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
125 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
126 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
127 import org.slf4j.Logger;
128 import org.slf4j.LoggerFactory;
129 import scala.concurrent.Await;
130 import scala.concurrent.Future;
131 import scala.concurrent.duration.FiniteDuration;
132
133 public class ShardManagerTest extends AbstractShardManagerTest {
134     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
135     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
136     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
137
138     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
139
140     private ActorSystem newActorSystem(final String config) {
141         return newActorSystem("cluster-test", config);
142     }
143
144     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
145         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
146         if (system == getSystem()) {
147             return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
148         }
149
150         return system.actorOf(Props.create(MessageCollectorActor.class), name);
151     }
152
153     private Props newShardMgrProps() {
154         return newShardMgrProps(new MockConfiguration());
155     }
156
157     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
158         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
159         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
160         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
161         return mockFactory;
162     }
163
164     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
165         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
166     }
167
168     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
169         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
170                 .distributedDataStore(mock(DistributedDataStore.class));
171     }
172
173
174     private Props newPropsShardMgrWithMockShardActor() {
175         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
176                 Dispatchers.DefaultDispatcherId());
177     }
178
179     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
180         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
181                 .withDispatcher(Dispatchers.DefaultDispatcherId());
182     }
183
184
185     private TestShardManager newTestShardManager() {
186         return newTestShardManager(newShardMgrProps());
187     }
188
189     private TestShardManager newTestShardManager(final Props props) {
190         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
191         TestShardManager shardManager = shardManagerActor.underlyingActor();
192         shardManager.waitForRecoveryComplete();
193         return shardManager;
194     }
195
196     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
197             final JavaTestKit kit) {
198         AssertionError last = null;
199         Stopwatch sw = Stopwatch.createStarted();
200         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
201             try {
202                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
203                 kit.expectMsgClass(LocalShardFound.class);
204                 return;
205             } catch (AssertionError e) {
206                 last = e;
207             }
208
209             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
210         }
211
212         throw last;
213     }
214
215     @SuppressWarnings("unchecked")
216     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final JavaTestKit kit, final String msg) {
217         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
218         if (reply instanceof Failure) {
219             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
220         }
221
222         return (T)reply;
223     }
224
225     @Test
226     public void testPerShardDatastoreContext() throws Exception {
227         LOG.info("testPerShardDatastoreContext starting");
228         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
229                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
230
231         Mockito.doReturn(
232                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
233                 .when(mockFactory).getShardDatastoreContext("default");
234
235         Mockito.doReturn(
236                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
237                 .when(mockFactory).getShardDatastoreContext("topology");
238
239         final MockConfiguration mockConfig = new MockConfiguration() {
240             @Override
241             public Collection<String> getMemberShardNames(final MemberName memberName) {
242                 return Arrays.asList("default", "topology");
243             }
244
245             @Override
246             public Collection<MemberName> getMembersFromShardName(final String shardName) {
247                 return members("member-1");
248             }
249         };
250
251         final ActorRef defaultShardActor = actorFactory.createActor(
252                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
253         final ActorRef topologyShardActor = actorFactory.createActor(
254                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
255
256         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
257                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
258         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
259         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
260
261         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
262         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
263         class LocalShardManager extends ShardManager {
264             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
265                 super(creator);
266             }
267
268             @Override
269             protected ActorRef newShardActor(final ShardInformation info) {
270                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
271                 ActorRef ref = null;
272                 if (entry != null) {
273                     ref = entry.getKey();
274                     entry.setValue(info.getDatastoreContext());
275                 }
276
277                 newShardActorLatch.countDown();
278                 return ref;
279             }
280         }
281
282         final Creator<ShardManager> creator = new Creator<ShardManager>() {
283             private static final long serialVersionUID = 1L;
284             @Override
285             public ShardManager create() throws Exception {
286                 return new LocalShardManager(
287                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
288                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
289             }
290         };
291
292         JavaTestKit kit = new JavaTestKit(getSystem());
293
294         final ActorRef shardManager = actorFactory.createActor(Props.create(
295                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
296
297         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
298
299         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
300         assertEquals("getShardElectionTimeoutFactor", 6,
301                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
302         assertEquals("getShardElectionTimeoutFactor", 7,
303                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
304
305         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
306                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
307         Mockito.doReturn(
308                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
309                 .when(newMockFactory).getShardDatastoreContext("default");
310
311         Mockito.doReturn(
312                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
313                 .when(newMockFactory).getShardDatastoreContext("topology");
314
315         shardManager.tell(newMockFactory, kit.getRef());
316
317         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
318                 DatastoreContext.class);
319         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
320
321         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
322         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
323
324         LOG.info("testPerShardDatastoreContext ending");
325     }
326
327     @Test
328     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
329         new JavaTestKit(getSystem()) {
330             {
331                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
332
333                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
334
335                 shardManager.tell(new FindPrimary("non-existent", false), getRef());
336
337                 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
338             }
339         };
340     }
341
342     @Test
343     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
344         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
345         new JavaTestKit(getSystem()) {
346             {
347                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
348
349                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
350
351                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
352                 shardManager.tell(new ActorInitialized(), mockShardActor);
353
354                 DataTree mockDataTree = mock(DataTree.class);
355                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
356                         DataStoreVersions.CURRENT_VERSION), getRef());
357
358                 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
359                 shardManager.tell(
360                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
361                         mockShardActor);
362
363                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
364
365                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
366                         LocalPrimaryShardFound.class);
367                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
368                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
369                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
370             }
371         };
372
373         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
374     }
375
376     @Test
377     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
378         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
379         new JavaTestKit(getSystem()) {
380             {
381                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
382
383                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
384                 shardManager.tell(new ActorInitialized(), mockShardActor);
385
386                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
387                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
388                 shardManager.tell(
389                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
390                         mockShardActor);
391                 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
392                         mockShardActor);
393
394                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
395
396                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
397             }
398         };
399
400         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
401     }
402
403     @Test
404     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
405         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
406         new JavaTestKit(getSystem()) {
407             {
408                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
409
410                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
411                 shardManager.tell(new ActorInitialized(), mockShardActor);
412
413                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
414                 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
415
416                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
417                 shardManager.tell(
418                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
419                         mockShardActor);
420                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
421                 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
422
423                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
424
425                 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
426                         RemotePrimaryShardFound.class);
427                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
428                         primaryFound.getPrimaryPath().contains("member-2-shard-default"));
429                 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
430             }
431         };
432
433         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
434     }
435
436     @Test
437     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
438         new JavaTestKit(getSystem()) {
439             {
440                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
441
442                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
443
444                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
445             }
446         };
447     }
448
449     @Test
450     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
451         new JavaTestKit(getSystem()) {
452             {
453                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
454
455                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
456                 shardManager.tell(new ActorInitialized(), mockShardActor);
457
458                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
459
460                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
461             }
462         };
463     }
464
465     @Test
466     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
467         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
468         new JavaTestKit(getSystem()) {
469             {
470                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
471
472                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
473                 shardManager.tell(new ActorInitialized(), mockShardActor);
474
475                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
476                 shardManager.tell(
477                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
478                         mockShardActor);
479
480                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
481
482                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
483
484                 DataTree mockDataTree = mock(DataTree.class);
485                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
486                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
487
488                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
489
490                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
491                         LocalPrimaryShardFound.class);
492                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
493                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
494                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
495             }
496         };
497
498         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
499     }
500
501     @Test
502     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
503         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
504         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
505         new JavaTestKit(getSystem()) {
506             {
507                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
508
509                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
510
511                 // We're passing waitUntilInitialized = true to FindPrimary so
512                 // the response should be
513                 // delayed until we send ActorInitialized and
514                 // RoleChangeNotification.
515                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
516
517                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
518
519                 shardManager.tell(new ActorInitialized(), mockShardActor);
520
521                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
522
523                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
524                 shardManager.tell(
525                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
526                         mockShardActor);
527
528                 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
529
530                 DataTree mockDataTree = mock(DataTree.class);
531                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
532                         DataStoreVersions.CURRENT_VERSION), mockShardActor);
533
534                 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
535                         LocalPrimaryShardFound.class);
536                 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
537                         primaryFound.getPrimaryPath().contains("member-1-shard-default"));
538                 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
539
540                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
541             }
542         };
543
544         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
545     }
546
547     @Test
548     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
549         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
550         new JavaTestKit(getSystem()) {
551             {
552                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
553
554                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
555
556                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
557
558                 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
559
560                 shardManager.tell(new ActorInitialized(), mockShardActor);
561
562                 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
563             }
564         };
565
566         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
567     }
568
569     @Test
570     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
571         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
572         new JavaTestKit(getSystem()) {
573             {
574                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
575
576                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
577                 shardManager.tell(new ActorInitialized(), mockShardActor);
578                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
579                         RaftState.Candidate.name()), mockShardActor);
580
581                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
582
583                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
584             }
585         };
586
587         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
588     }
589
590     @Test
591     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
592         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
593         new JavaTestKit(getSystem()) {
594             {
595                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
596
597                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
598                 shardManager.tell(new ActorInitialized(), mockShardActor);
599                 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
600                         RaftState.IsolatedLeader.name()), mockShardActor);
601
602                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
603
604                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
605             }
606         };
607
608         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
609     }
610
611     @Test
612     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
613         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
614         new JavaTestKit(getSystem()) {
615             {
616                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
617
618                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
619                 shardManager.tell(new ActorInitialized(), mockShardActor);
620
621                 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
622
623                 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
624             }
625         };
626
627         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
628     }
629
630     @Test
631     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
632         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
633         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
634
635         // Create an ActorSystem ShardManager actor for member-1.
636
637         final ActorSystem system1 = newActorSystem("Member1");
638         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
639
640         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
641                 newTestShardMgrBuilderWithMockShardActor().cluster(
642                         new ClusterWrapperImpl(system1)).props().withDispatcher(
643                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
644
645         // Create an ActorSystem ShardManager actor for member-2.
646
647         final ActorSystem system2 = newActorSystem("Member2");
648
649         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
650
651         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
652
653         MockConfiguration mockConfig2 = new MockConfiguration(
654                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
655                         .put("astronauts", Arrays.asList("member-2")).build());
656
657         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
658                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
659                         new ClusterWrapperImpl(system2)).props().withDispatcher(
660                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
661
662         new JavaTestKit(system1) {
663             {
664                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
665                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
666
667                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
668
669                 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
670                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
671                 shardManager2.tell(
672                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
673                         mockShardActor2);
674                 shardManager2.tell(
675                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
676                         mockShardActor2);
677
678                 shardManager1.underlyingActor().waitForMemberUp();
679                 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
680
681                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
682                 String path = found.getPrimaryPath();
683                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
684                 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
685
686                 shardManager2.underlyingActor().verifyFindPrimary();
687
688                 Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
689
690                 shardManager1.underlyingActor().waitForMemberRemoved();
691
692                 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
693
694                 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
695             }
696         };
697
698         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
699     }
700
701     @Test
702     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
703         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
704         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
705
706         // Create an ActorSystem ShardManager actor for member-1.
707
708         final ActorSystem system1 = newActorSystem("Member1");
709         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
710
711         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
712
713         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
714                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
715                         new ClusterWrapperImpl(system1)).props().withDispatcher(
716                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
717
718         // Create an ActorSystem ShardManager actor for member-2.
719
720         final ActorSystem system2 = newActorSystem("Member2");
721
722         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
723
724         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
725
726         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
727                 .put("default", Arrays.asList("member-1", "member-2")).build());
728
729         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
730                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
731                         new ClusterWrapperImpl(system2)).props().withDispatcher(
732                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
733
734         new JavaTestKit(system1) {
735             {
736                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
737                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
738                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
739                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
740
741                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
742                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
743                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
744                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
745                 shardManager1.tell(
746                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
747                         mockShardActor1);
748                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
749                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
750                 shardManager2.tell(
751                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
752                         mockShardActor2);
753                 shardManager1.underlyingActor().waitForMemberUp();
754
755                 shardManager1.tell(new FindPrimary("default", true), getRef());
756
757                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
758                 String path = found.getPrimaryPath();
759                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
760
761                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
762                         "akka://cluster-test@127.0.0.1:2558"), getRef());
763
764                 shardManager1.underlyingActor().waitForUnreachableMember();
765
766                 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
767                 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
768                 MessageCollectorActor.clearMessages(mockShardActor1);
769
770                 shardManager1.tell(
771                         MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
772                         getRef());
773
774                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
775
776                 shardManager1.tell(new FindPrimary("default", true), getRef());
777
778                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
779
780                 shardManager1.tell(
781                         MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
782                         getRef());
783
784                 shardManager1.underlyingActor().waitForReachableMember();
785
786                 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
787                 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
788                 MessageCollectorActor.clearMessages(mockShardActor1);
789
790                 shardManager1.tell(new FindPrimary("default", true), getRef());
791
792                 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
793                 String path1 = found1.getPrimaryPath();
794                 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
795
796                 shardManager1.tell(
797                         MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
798                         getRef());
799
800                 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
801
802                 // Test FindPrimary wait succeeds after reachable member event.
803
804                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
805                         "akka://cluster-test@127.0.0.1:2558"), getRef());
806                 shardManager1.underlyingActor().waitForUnreachableMember();
807
808                 shardManager1.tell(new FindPrimary("default", true), getRef());
809
810                 shardManager1.tell(
811                         MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
812                         getRef());
813
814                 RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
815                 String path2 = found2.getPrimaryPath();
816                 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
817             }
818         };
819
820         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
821     }
822
823     @Test
824     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
825         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
826         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
827
828         // Create an ActorSystem ShardManager actor for member-1.
829
830         final ActorSystem system1 = newActorSystem("Member1");
831         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
832
833         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
834
835         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
836         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
837                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
838                         .primaryShardInfoCache(primaryShardInfoCache).props()
839                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
840                 shardManagerID);
841
842         // Create an ActorSystem ShardManager actor for member-2.
843
844         final ActorSystem system2 = newActorSystem("Member2");
845
846         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
847
848         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
849
850         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
851                 .put("default", Arrays.asList("member-1", "member-2")).build());
852
853         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
854                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
855                         new ClusterWrapperImpl(system2)).props().withDispatcher(
856                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
857
858         new JavaTestKit(system1) {
859             {
860                 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
861                 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
862                 shardManager1.tell(new ActorInitialized(), mockShardActor1);
863                 shardManager2.tell(new ActorInitialized(), mockShardActor2);
864
865                 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
866                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
867                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
868                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
869                 shardManager1.tell(
870                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
871                         mockShardActor1);
872                 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
873                         DataStoreVersions.CURRENT_VERSION), mockShardActor2);
874                 shardManager2.tell(
875                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
876                         mockShardActor2);
877                 shardManager1.underlyingActor().waitForMemberUp();
878
879                 shardManager1.tell(new FindPrimary("default", true), getRef());
880
881                 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
882                 String path = found.getPrimaryPath();
883                 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
884
885                 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
886                         system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
887
888                 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
889                         "akka://cluster-test@127.0.0.1:2558"), getRef());
890
891                 shardManager1.underlyingActor().waitForUnreachableMember();
892
893                 shardManager1.tell(new FindPrimary("default", true), getRef());
894
895                 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
896
897                 assertNull("Expected primaryShardInfoCache entry removed",
898                         primaryShardInfoCache.getIfPresent("default"));
899
900                 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
901                         DataStoreVersions.CURRENT_VERSION), mockShardActor1);
902                 shardManager1.tell(
903                         new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
904                         mockShardActor1);
905
906                 shardManager1.tell(new FindPrimary("default", true), getRef());
907
908                 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
909                 String path1 = found1.getPrimaryPath();
910                 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
911
912             }
913         };
914
915         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
916     }
917
918
919     @Test
920     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
921         new JavaTestKit(getSystem()) {
922             {
923                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
924
925                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
926
927                 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
928
929                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
930
931                 assertEquals("getShardName", "non-existent", notFound.getShardName());
932             }
933         };
934     }
935
936     @Test
937     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
938         new JavaTestKit(getSystem()) {
939             {
940                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
941
942                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
943                 shardManager.tell(new ActorInitialized(), mockShardActor);
944
945                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
946
947                 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
948
949                 assertTrue("Found path contains " + found.getPath().path().toString(),
950                         found.getPath().path().toString().contains("member-1-shard-default-config"));
951             }
952         };
953     }
954
955     @Test
956     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
957         new JavaTestKit(getSystem()) {
958             {
959                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
960
961                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
962
963                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
964             }
965         };
966     }
967
968     @Test
969     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
970         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
971         new JavaTestKit(getSystem()) {
972             {
973                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
974
975                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
976
977                 // We're passing waitUntilInitialized = true to FindLocalShard
978                 // so the response should be
979                 // delayed until we send ActorInitialized.
980                 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
981                         new Timeout(5, TimeUnit.SECONDS));
982
983                 shardManager.tell(new ActorInitialized(), mockShardActor);
984
985                 Object resp = Await.result(future, duration("5 seconds"));
986                 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
987             }
988         };
989
990         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
991     }
992
993     @Test
994     public void testOnRecoveryJournalIsCleaned() {
995         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
996         InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
997         InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
998         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
999
1000         newTestShardManager();
1001
1002         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
1003
1004         // Journal entries up to the last one should've been deleted
1005         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
1006         synchronized (journal) {
1007             assertEquals("Journal size", 0, journal.size());
1008         }
1009     }
1010
1011     @Test
1012     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1013         TestShardManager shardManager = newTestShardManager();
1014
1015         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1016         shardManager.onReceiveCommand(new RoleChangeNotification(
1017                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1018
1019         verify(ready, never()).countDown();
1020
1021         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1022                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1023
1024         verify(ready, times(1)).countDown();
1025     }
1026
1027     @Test
1028     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1029         new JavaTestKit(getSystem()) {
1030             {
1031                 TestShardManager shardManager = newTestShardManager();
1032
1033                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1034                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1035
1036                 verify(ready, never()).countDown();
1037
1038                 shardManager
1039                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1040
1041                 shardManager.onReceiveCommand(
1042                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1043                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1044
1045                 verify(ready, times(1)).countDown();
1046             }
1047         };
1048     }
1049
1050     @Test
1051     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1052         new JavaTestKit(getSystem()) {
1053             {
1054                 TestShardManager shardManager = newTestShardManager();
1055
1056                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1057                 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1058
1059                 verify(ready, never()).countDown();
1060
1061                 shardManager.onReceiveCommand(
1062                         new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1063                                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1064
1065                 shardManager
1066                         .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1067
1068                 verify(ready, times(1)).countDown();
1069             }
1070         };
1071     }
1072
1073     @Test
1074     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1075         TestShardManager shardManager = newTestShardManager();
1076
1077         shardManager.onReceiveCommand(new RoleChangeNotification(
1078                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1079
1080         verify(ready, never()).countDown();
1081     }
1082
1083     @Test
1084     public void testByDefaultSyncStatusIsFalse() throws Exception {
1085         TestShardManager shardManager = newTestShardManager();
1086
1087         assertEquals(false, shardManager.getMBean().getSyncStatus());
1088     }
1089
1090     @Test
1091     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1092         TestShardManager shardManager = newTestShardManager();
1093
1094         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1095                 RaftState.Follower.name(), RaftState.Leader.name()));
1096
1097         assertEquals(true, shardManager.getMBean().getSyncStatus());
1098     }
1099
1100     @Test
1101     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1102         TestShardManager shardManager = newTestShardManager();
1103
1104         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1105         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1106                 RaftState.Follower.name(), RaftState.Candidate.name()));
1107
1108         assertEquals(false, shardManager.getMBean().getSyncStatus());
1109
1110         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1111         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1112                 true, shardId));
1113
1114         assertEquals(false, shardManager.getMBean().getSyncStatus());
1115     }
1116
1117     @Test
1118     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1119         TestShardManager shardManager = newTestShardManager();
1120
1121         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1122         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1123                 RaftState.Candidate.name(), RaftState.Follower.name()));
1124
1125         // Initially will be false
1126         assertEquals(false, shardManager.getMBean().getSyncStatus());
1127
1128         // Send status true will make sync status true
1129         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1130
1131         assertEquals(true, shardManager.getMBean().getSyncStatus());
1132
1133         // Send status false will make sync status false
1134         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1135
1136         assertEquals(false, shardManager.getMBean().getSyncStatus());
1137
1138     }
1139
1140     @Test
1141     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1142         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1143         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1144             @Override
1145             public List<String> getMemberShardNames(final MemberName memberName) {
1146                 return Arrays.asList("default", "astronauts");
1147             }
1148         }));
1149
1150         // Initially will be false
1151         assertEquals(false, shardManager.getMBean().getSyncStatus());
1152
1153         // Make default shard leader
1154         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1155         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1156                 RaftState.Follower.name(), RaftState.Leader.name()));
1157
1158         // default = Leader, astronauts is unknown so sync status remains false
1159         assertEquals(false, shardManager.getMBean().getSyncStatus());
1160
1161         // Make astronauts shard leader as well
1162         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1163         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1164                 RaftState.Follower.name(), RaftState.Leader.name()));
1165
1166         // Now sync status should be true
1167         assertEquals(true, shardManager.getMBean().getSyncStatus());
1168
1169         // Make astronauts a Follower
1170         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1171                 RaftState.Leader.name(), RaftState.Follower.name()));
1172
1173         // Sync status is not true
1174         assertEquals(false, shardManager.getMBean().getSyncStatus());
1175
1176         // Make the astronauts follower sync status true
1177         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1178
1179         // Sync status is now true
1180         assertEquals(true, shardManager.getMBean().getSyncStatus());
1181
1182         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1183     }
1184
1185     @Test
1186     public void testOnReceiveSwitchShardBehavior() throws Exception {
1187         new JavaTestKit(getSystem()) {
1188             {
1189                 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1190
1191                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1192                 shardManager.tell(new ActorInitialized(), mockShardActor);
1193
1194                 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1195
1196                 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1197                         SwitchBehavior.class);
1198
1199                 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1200                 assertEquals(1000, switchBehavior.getNewTerm());
1201             }
1202         };
1203     }
1204
1205     private static List<MemberName> members(final String... names) {
1206         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1207     }
1208
1209     @Test
1210     public void testOnCreateShard() {
1211         LOG.info("testOnCreateShard starting");
1212         new JavaTestKit(getSystem()) {
1213             {
1214                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1215
1216                 ActorRef shardManager = actorFactory
1217                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1218                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1219
1220                 SchemaContext schemaContext = TestModel.createTestContext();
1221                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1222
1223                 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1224                         .persistent(false).build();
1225                 Shard.Builder shardBuilder = Shard.builder();
1226
1227                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1228                         "foo", null, members("member-1", "member-5", "member-6"));
1229                 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1230
1231                 expectMsgClass(duration("5 seconds"), Success.class);
1232
1233                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1234
1235                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1236
1237                 assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1238                 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1239                         .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1240                 assertEquals("peerMembers", Sets.newHashSet(
1241                         ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1242                         ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1243                         shardBuilder.getPeerAddresses().keySet());
1244                 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1245                         shardBuilder.getId());
1246                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1247
1248                 // Send CreateShard with same name - should return Success with
1249                 // a message.
1250
1251                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1252
1253                 Success success = expectMsgClass(duration("5 seconds"), Success.class);
1254                 assertNotNull("Success status is null", success.status());
1255             }
1256         };
1257
1258         LOG.info("testOnCreateShard ending");
1259     }
1260
1261     @Test
1262     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1263         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1264         new JavaTestKit(getSystem()) {
1265             {
1266                 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1267
1268                 ActorRef shardManager = actorFactory
1269                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1270                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1271
1272                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1273
1274                 Shard.Builder shardBuilder = Shard.builder();
1275                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1276                         "foo", null, members("member-5", "member-6"));
1277
1278                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1279                 expectMsgClass(duration("5 seconds"), Success.class);
1280
1281                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1282                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1283
1284                 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1285                 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1286                         .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1287             }
1288         };
1289
1290         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1291     }
1292
1293     @Test
1294     public void testOnCreateShardWithNoInitialSchemaContext() {
1295         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1296         new JavaTestKit(getSystem()) {
1297             {
1298                 ActorRef shardManager = actorFactory
1299                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1300                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1301
1302                 Shard.Builder shardBuilder = Shard.builder();
1303
1304                 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1305                         "foo", null, members("member-1"));
1306                 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1307
1308                 expectMsgClass(duration("5 seconds"), Success.class);
1309
1310                 SchemaContext schemaContext = TestModel.createTestContext();
1311                 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1312
1313                 shardManager.tell(new FindLocalShard("foo", true), getRef());
1314
1315                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1316
1317                 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1318                 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1319             }
1320         };
1321
1322         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1323     }
1324
1325     @Test
1326     public void testGetSnapshot() throws Exception {
1327         LOG.info("testGetSnapshot starting");
1328         JavaTestKit kit = new JavaTestKit(getSystem());
1329
1330         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1331                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1332                 .put("astronauts", Collections.<String>emptyList()).build());
1333
1334         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1335                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1336
1337         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1338         Failure failure = kit.expectMsgClass(Failure.class);
1339         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1340
1341         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1342
1343         waitForShardInitialized(shardManager, "shard1", kit);
1344         waitForShardInitialized(shardManager, "shard2", kit);
1345
1346         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1347
1348         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1349
1350         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1351         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1352
1353         Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
1354
1355         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1356                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1357
1358         // Add a new replica
1359
1360         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1361
1362         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1363         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1364
1365         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1366         mockShardLeaderKit.expectMsgClass(AddServer.class);
1367         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1368         kit.expectMsgClass(Status.Success.class);
1369         waitForShardInitialized(shardManager, "astronauts", kit);
1370
1371         // Send another GetSnapshot and verify
1372
1373         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1374         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1375
1376         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1377                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1378
1379         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1380         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1381         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1382                 Sets.newHashSet(snapshot.getShardList()));
1383
1384         LOG.info("testGetSnapshot ending");
1385     }
1386
1387     @Test
1388     public void testRestoreFromSnapshot() throws Exception {
1389         LOG.info("testRestoreFromSnapshot starting");
1390
1391         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1392
1393         JavaTestKit kit = new JavaTestKit(getSystem());
1394
1395         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1396                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1397                 .put("astronauts", Collections.<String>emptyList()).build());
1398
1399         ShardManagerSnapshot snapshot =
1400                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1401         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1402                 Collections.<ShardSnapshot>emptyList());
1403         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1404                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1405
1406         shardManager.underlyingActor().waitForRecoveryComplete();
1407
1408         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1409
1410         waitForShardInitialized(shardManager, "shard1", kit);
1411         waitForShardInitialized(shardManager, "shard2", kit);
1412         waitForShardInitialized(shardManager, "astronauts", kit);
1413
1414         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1415
1416         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1417
1418         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1419
1420         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1421         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1422                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1423
1424         LOG.info("testRestoreFromSnapshot ending");
1425     }
1426
1427     @Test
1428     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1429         new JavaTestKit(getSystem()) {
1430             {
1431                 ActorRef shardManager = actorFactory
1432                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1433                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1434
1435                 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1436                 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1437
1438                 assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
1439             }
1440         };
1441     }
1442
1443     @Test
1444     public void testAddShardReplica() throws Exception {
1445         LOG.info("testAddShardReplica starting");
1446         MockConfiguration mockConfig = new MockConfiguration(
1447                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1448                         .put("astronauts", Arrays.asList("member-2")).build());
1449
1450         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1451         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1452
1453         // Create an ActorSystem ShardManager actor for member-1.
1454         final ActorSystem system1 = newActorSystem("Member1");
1455         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1456         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1457         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1458                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1459                         .cluster(new ClusterWrapperImpl(system1)).props()
1460                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1461                 shardManagerID);
1462
1463         // Create an ActorSystem ShardManager actor for member-2.
1464         final ActorSystem system2 = newActorSystem("Member2");
1465         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1466
1467         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1468         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1469         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1470                 Props.create(MockRespondActor.class, AddServer.class,
1471                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1472                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1473                 name);
1474         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1475                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1476                         .cluster(new ClusterWrapperImpl(system2)).props()
1477                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1478                 shardManagerID);
1479
1480         new JavaTestKit(system1) {
1481             {
1482                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1483                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1484
1485                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1486
1487                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1488                 leaderShardManager.tell(
1489                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1490                         mockShardLeaderActor);
1491                 leaderShardManager.tell(
1492                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1493                         mockShardLeaderActor);
1494
1495                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1496                 leaderShardManager.underlyingActor().waitForMemberUp();
1497
1498                 // Have a dummy snapshot to be overwritten by the new data
1499                 // persisted.
1500                 String[] restoredShards = { "default", "people" };
1501                 ShardManagerSnapshot snapshot =
1502                         new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1503                 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1504                 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1505
1506                 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1507                 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1508
1509                 // construct a mock response message
1510                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1511                 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1512                         AddServer.class);
1513                 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1514                 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1515                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1516
1517                 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1518                 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1519                 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1520                         ShardManagerSnapshot.class);
1521                 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1522                 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1523                 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1524                         Sets.newHashSet(shardManagerSnapshot.getShardList()));
1525             }
1526         };
1527         LOG.info("testAddShardReplica ending");
1528     }
1529
1530     @Test
1531     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1532         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1533         new JavaTestKit(getSystem()) {
1534             {
1535                 TestActorRef<TestShardManager> shardManager = actorFactory
1536                         .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1537
1538                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1539                 shardManager.tell(new ActorInitialized(), mockShardActor);
1540
1541                 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1542                 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1543                 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1544                         .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1545
1546                 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1547
1548                 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1549                 shardManager.tell(
1550                         new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1551                         mockShardActor);
1552                 shardManager.tell(
1553                         new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1554                         mockShardActor);
1555
1556                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1557
1558                 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1559
1560                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1561                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1562
1563                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1564                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1565
1566                 // Send message again to verify previous in progress state is
1567                 // cleared
1568
1569                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1570                 resp = expectMsgClass(duration("5 seconds"), Failure.class);
1571                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1572
1573                 // Send message again with an AddServer timeout to verify the
1574                 // pre-existing shard actor isn't terminated.
1575
1576                 shardManager.tell(
1577                         newDatastoreContextFactory(
1578                                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()),
1579                         getRef());
1580                 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1581                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1582                 expectMsgClass(duration("5 seconds"), Failure.class);
1583
1584                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1585                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1586             }
1587         };
1588
1589         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1590     }
1591
1592     @Test
1593     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1594         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1595         new JavaTestKit(getSystem()) {
1596             {
1597                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1598                 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1599
1600                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1601                 shardManager.tell(new ActorInitialized(), mockShardActor);
1602                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1603                         DataStoreVersions.CURRENT_VERSION), getRef());
1604                 shardManager.tell(
1605                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1606                         mockShardActor);
1607
1608                 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1609                 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1610                 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1611
1612                 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1613                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1614             }
1615         };
1616
1617         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1618     }
1619
1620     @Test
1621     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1622         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1623         new JavaTestKit(getSystem()) {
1624             {
1625                 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1626
1627                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1628                         .put("astronauts", Arrays.asList("member-2")).build());
1629
1630                 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1631                 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1632                         newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1633                             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1634                 shardManager.underlyingActor()
1635                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1636
1637                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1638
1639                 JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1640                 terminateWatcher.watch(mockNewReplicaShardActor);
1641
1642                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1643
1644                 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1645                 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1646                         addServerMsg.getNewServerId());
1647                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1648
1649                 Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1650                 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1651
1652                 shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1653                 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1654
1655                 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1656
1657                 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1658                 mockShardLeaderKit.expectMsgClass(AddServer.class);
1659                 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1660                 failure = expectMsgClass(duration("5 seconds"), Failure.class);
1661                 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1662             }
1663         };
1664
1665         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1666     }
1667
1668     @Test
1669     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1670         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1671                 AddServer.class, new AddShardReplica("astronauts"));
1672     }
1673
1674     @Test
1675     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1676         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1677         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1678         new JavaTestKit(getSystem()) {
1679             {
1680                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1681                         .put("astronauts", Arrays.asList("member-2")).build());
1682
1683                 final ActorRef newReplicaShardManager = actorFactory
1684                         .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1685                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1686
1687                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1688                 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1689                         AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1690
1691                 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1692                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1693                 assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
1694             }
1695         };
1696
1697         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1698     }
1699
1700     @Test
1701     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1702         new JavaTestKit(getSystem()) {
1703             {
1704                 ActorRef shardManager = actorFactory
1705                         .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1706                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1707
1708                 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1709                 Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1710                 assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
1711             }
1712         };
1713     }
1714
1715     @Test
1716     /**
1717      * Primary is Local
1718      */
1719     public void testRemoveShardReplicaLocal() throws Exception {
1720         new JavaTestKit(getSystem()) {
1721             {
1722                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1723
1724                 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1725                         RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1726
1727                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1728
1729                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1730                 shardManager.tell(new ActorInitialized(), respondActor);
1731                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1732                         DataStoreVersions.CURRENT_VERSION), getRef());
1733                 shardManager.tell(
1734                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1735                         respondActor);
1736
1737                 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1738                 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1739                         RemoveServer.class);
1740                 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1741                         removeServer.getServerId());
1742                 expectMsgClass(duration("5 seconds"), Success.class);
1743             }
1744         };
1745     }
1746
1747     @Test
1748     public void testRemoveShardReplicaRemote() throws Exception {
1749         MockConfiguration mockConfig = new MockConfiguration(
1750                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1751                         .put("astronauts", Arrays.asList("member-1")).build());
1752
1753         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1754
1755         // Create an ActorSystem ShardManager actor for member-1.
1756         final ActorSystem system1 = newActorSystem("Member1");
1757         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1758         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1759
1760         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1761                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1762                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1763                 shardManagerID);
1764
1765         // Create an ActorSystem ShardManager actor for member-2.
1766         final ActorSystem system2 = newActorSystem("Member2");
1767         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1768
1769         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1770         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1771         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1772                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1773                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1774
1775         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1776
1777         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1778                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1779                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1780                 shardManagerID);
1781
1782         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1783         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1784         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1785         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1786         // look like so,
1787         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1788         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1789         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1790         // dead letters.
1791         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1792         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1793         // thing works as expected
1794         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1795                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1796                         "member-2-shard-default-" + shardMrgIDSuffix);
1797
1798         LOG.error("Forwarding actor : {}", actorRef);
1799
1800         new JavaTestKit(system1) {
1801             {
1802                 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1803                 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1804
1805                 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1806                 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1807
1808                 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1809                 leaderShardManager.tell(
1810                         new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1811                         mockShardLeaderActor);
1812                 leaderShardManager.tell(
1813                         new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1814                         mockShardLeaderActor);
1815
1816                 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1817                 newReplicaShardManager.tell(
1818                         new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1819                         mockShardActor);
1820                 newReplicaShardManager.tell(
1821                         new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1822                         mockShardActor);
1823
1824                 newReplicaShardManager.underlyingActor().waitForMemberUp();
1825                 leaderShardManager.underlyingActor().waitForMemberUp();
1826
1827                 // construct a mock response message
1828                 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1829                 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1830                         RemoveServer.class);
1831                 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1832                 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1833                 expectMsgClass(duration("5 seconds"), Status.Success.class);
1834             }
1835         };
1836     }
1837
1838     @Test
1839     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1840         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1841                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1842     }
1843
1844     @Test
1845     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1846         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1847                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1848     }
1849
1850
1851     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1852                                                       final Class<?> firstForwardedServerChangeClass,
1853                                                       final Object secondServerChange) throws Exception {
1854         new JavaTestKit(getSystem()) {
1855             {
1856                 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1857                 final JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1858
1859                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1860                         .put(shardName, Arrays.asList("member-2")).build());
1861
1862                 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1863                         newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1864                                 .cluster(new MockClusterWrapper()).props()
1865                                 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1866                         shardMgrID);
1867
1868                 shardManager.underlyingActor()
1869                         .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1870
1871                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1872
1873                 shardManager.tell(firstServerChange, getRef());
1874
1875                 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1876
1877                 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1878
1879                 secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1880             }
1881         };
1882     }
1883
1884     @Test
1885     public void testServerRemovedShardActorNotRunning() throws Exception {
1886         LOG.info("testServerRemovedShardActorNotRunning starting");
1887         new JavaTestKit(getSystem()) {
1888             {
1889                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1890                         .put("default", Arrays.asList("member-1", "member-2"))
1891                         .put("astronauts", Arrays.asList("member-2"))
1892                         .put("people", Arrays.asList("member-1", "member-2")).build());
1893
1894                 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1895                         newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1896
1897                 shardManager.underlyingActor().waitForRecoveryComplete();
1898                 shardManager.tell(new FindLocalShard("people", false), getRef());
1899                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1900
1901                 shardManager.tell(new FindLocalShard("default", false), getRef());
1902                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1903
1904                 // Removed the default shard replica from member-1
1905                 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1906                 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1907                         .build();
1908                 shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1909
1910                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1911             }
1912         };
1913
1914         LOG.info("testServerRemovedShardActorNotRunning ending");
1915     }
1916
1917     @Test
1918     public void testServerRemovedShardActorRunning() throws Exception {
1919         LOG.info("testServerRemovedShardActorRunning starting");
1920         new JavaTestKit(getSystem()) {
1921             {
1922                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1923                         .put("default", Arrays.asList("member-1", "member-2"))
1924                         .put("astronauts", Arrays.asList("member-2"))
1925                         .put("people", Arrays.asList("member-1", "member-2")).build());
1926
1927                 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1928                 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1929
1930                 TestActorRef<TestShardManager> shardManager = actorFactory
1931                         .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1932                                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1933
1934                 shardManager.underlyingActor().waitForRecoveryComplete();
1935
1936                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1937                 shardManager.tell(new ActorInitialized(), shard);
1938
1939                 waitForShardInitialized(shardManager, "people", this);
1940                 waitForShardInitialized(shardManager, "default", this);
1941
1942                 // Removed the default shard replica from member-1
1943                 shardManager.tell(new ServerRemoved(shardId), getRef());
1944
1945                 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1946
1947                 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1948             }
1949         };
1950
1951         LOG.info("testServerRemovedShardActorRunning ending");
1952     }
1953
1954     @Test
1955     public void testShardPersistenceWithRestoredData() throws Exception {
1956         LOG.info("testShardPersistenceWithRestoredData starting");
1957         new JavaTestKit(getSystem()) {
1958             {
1959                 MockConfiguration mockConfig =
1960                     new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1961                             .put("default", Arrays.asList("member-1", "member-2"))
1962                             .put("astronauts", Arrays.asList("member-2"))
1963                             .put("people", Arrays.asList("member-1", "member-2")).build());
1964                 String[] restoredShards = {"default", "astronauts"};
1965                 ShardManagerSnapshot snapshot =
1966                         new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1967                 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1968
1969                 // create shardManager to come up with restored data
1970                 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1971                         newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1972
1973                 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1974
1975                 newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1976                 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1977                 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1978
1979                 // Verify a local shard is created for the restored shards,
1980                 // although we expect a NotInitializedException for the shards
1981                 // as the actor initialization
1982                 // message is not sent for them
1983                 newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1984                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1985
1986                 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1987                 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1988             }
1989         };
1990
1991         LOG.info("testShardPersistenceWithRestoredData ending");
1992     }
1993
1994     @Test
1995     public void testShutDown() throws Exception {
1996         LOG.info("testShutDown starting");
1997         new JavaTestKit(getSystem()) {
1998             {
1999                 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2000                         .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2001
2002                 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2003                 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
2004
2005                 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2006                 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
2007
2008                 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
2009                         .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
2010
2011                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2012                 shardManager.tell(new ActorInitialized(), shard1);
2013                 shardManager.tell(new ActorInitialized(), shard2);
2014
2015                 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2016                 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2017
2018                 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2019                 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2020
2021                 try {
2022                     Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2023                     fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2024                 } catch (TimeoutException e) {
2025                     // expected
2026                 }
2027
2028                 actorFactory.killActor(shard1, this);
2029                 actorFactory.killActor(shard2, this);
2030
2031                 Boolean stopped = Await.result(stopFuture, duration);
2032                 assertEquals("Stopped", Boolean.TRUE, stopped);
2033             }
2034         };
2035
2036         LOG.info("testShutDown ending");
2037     }
2038
2039     @Test
2040     public void testChangeServersVotingStatus() throws Exception {
2041         new JavaTestKit(getSystem()) {
2042             {
2043                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2044
2045                 ActorRef respondActor = actorFactory
2046                         .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2047                                 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2048
2049                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2050
2051                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2052                 shardManager.tell(new ActorInitialized(), respondActor);
2053                 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2054                         DataStoreVersions.CURRENT_VERSION), getRef());
2055                 shardManager.tell(
2056                         new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2057                         respondActor);
2058
2059                 shardManager.tell(
2060                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2061                         getRef());
2062
2063                 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2064                         .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2065                 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2066                         ImmutableMap.of(ShardIdentifier
2067                                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2068                                 Boolean.TRUE));
2069
2070                 expectMsgClass(duration("5 seconds"), Success.class);
2071             }
2072         };
2073     }
2074
2075     @Test
2076     public void testChangeServersVotingStatusWithNoLeader() throws Exception {
2077         new JavaTestKit(getSystem()) {
2078             {
2079                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2080
2081                 ActorRef respondActor = actorFactory
2082                         .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2083                                 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2084
2085                 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2086
2087                 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2088                 shardManager.tell(new ActorInitialized(), respondActor);
2089                 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2090
2091                 shardManager.tell(
2092                         new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2093                         getRef());
2094
2095                 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2096
2097                 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
2098                 assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
2099             }
2100         };
2101     }
2102
2103     public static class TestShardManager extends ShardManager {
2104         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2105         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2106         private ShardManagerSnapshot snapshot;
2107         private final Map<String, ActorRef> shardActors;
2108         private final ActorRef shardActor;
2109         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2110         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2111         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2112         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2113         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2114         private volatile MessageInterceptor messageInterceptor;
2115
2116         private TestShardManager(final Builder builder) {
2117             super(builder);
2118             shardActor = builder.shardActor;
2119             shardActors = builder.shardActors;
2120         }
2121
2122         @Override
2123         protected void handleRecover(final Object message) throws Exception {
2124             try {
2125                 super.handleRecover(message);
2126             } finally {
2127                 if (message instanceof RecoveryCompleted) {
2128                     recoveryComplete.countDown();
2129                 }
2130             }
2131         }
2132
2133         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2134             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2135                 latch.countDown();
2136             }
2137         }
2138
2139         @Override
2140         public void handleCommand(final Object message) throws Exception {
2141             try {
2142                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2143                     getSender().tell(messageInterceptor.apply(message), getSelf());
2144                 } else {
2145                     super.handleCommand(message);
2146                 }
2147             } finally {
2148                 if (message instanceof FindPrimary) {
2149                     findPrimaryMessageReceived.countDown();
2150                 } else if (message instanceof ClusterEvent.MemberUp) {
2151                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2152                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2153                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2154                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2155                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2156                 } else if (message instanceof ClusterEvent.ReachableMember) {
2157                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2158                 }
2159             }
2160         }
2161
2162         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2163             this.messageInterceptor = messageInterceptor;
2164         }
2165
2166         void waitForRecoveryComplete() {
2167             assertEquals("Recovery complete", true,
2168                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2169         }
2170
2171         public void waitForMemberUp() {
2172             assertEquals("MemberUp received", true,
2173                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2174             memberUpReceived = new CountDownLatch(1);
2175         }
2176
2177         void waitForMemberRemoved() {
2178             assertEquals("MemberRemoved received", true,
2179                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2180             memberRemovedReceived = new CountDownLatch(1);
2181         }
2182
2183         void waitForUnreachableMember() {
2184             assertEquals("UnreachableMember received", true,
2185                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2186                 ));
2187             memberUnreachableReceived = new CountDownLatch(1);
2188         }
2189
2190         void waitForReachableMember() {
2191             assertEquals("ReachableMember received", true,
2192                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2193             memberReachableReceived = new CountDownLatch(1);
2194         }
2195
2196         void verifyFindPrimary() {
2197             assertEquals("FindPrimary received", true,
2198                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2199             findPrimaryMessageReceived = new CountDownLatch(1);
2200         }
2201
2202         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2203             return new Builder(datastoreContextBuilder);
2204         }
2205
2206         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2207             private ActorRef shardActor;
2208             private final Map<String, ActorRef> shardActors = new HashMap<>();
2209
2210             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2211                 super(TestShardManager.class);
2212                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2213             }
2214
2215             Builder shardActor(final ActorRef newShardActor) {
2216                 this.shardActor = newShardActor;
2217                 return this;
2218             }
2219
2220             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2221                 shardActors.put(shardName, actorRef);
2222                 return this;
2223             }
2224         }
2225
2226         @Override
2227         public void saveSnapshot(final Object obj) {
2228             snapshot = (ShardManagerSnapshot) obj;
2229             snapshotPersist.countDown();
2230             super.saveSnapshot(obj);
2231         }
2232
2233         void verifySnapshotPersisted(final Set<String> shardList) {
2234             assertEquals("saveSnapshot invoked", true,
2235                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2236             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2237         }
2238
2239         @Override
2240         protected ActorRef newShardActor(final ShardInformation info) {
2241             if (shardActors.get(info.getShardName()) != null) {
2242                 return shardActors.get(info.getShardName());
2243             }
2244
2245             if (shardActor != null) {
2246                 return shardActor;
2247             }
2248
2249             return super.newShardActor(info);
2250         }
2251     }
2252
2253     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2254                                                      extends AbstractShardManagerCreator<T> {
2255         private final Class<C> shardManagerClass;
2256
2257         AbstractGenericCreator(final Class<C> shardManagerClass) {
2258             this.shardManagerClass = shardManagerClass;
2259             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2260                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2261         }
2262
2263         @Override
2264         public Props props() {
2265             verify();
2266             return Props.create(shardManagerClass, this);
2267         }
2268     }
2269
2270     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2271         GenericCreator(final Class<C> shardManagerClass) {
2272             super(shardManagerClass);
2273         }
2274     }
2275
2276     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2277         private static final long serialVersionUID = 1L;
2278         private final Creator<ShardManager> delegate;
2279
2280         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2281             this.delegate = delegate;
2282         }
2283
2284         @Override
2285         public ShardManager create() throws Exception {
2286             return delegate.create();
2287         }
2288     }
2289
2290     interface MessageInterceptor extends Function<Object, Object> {
2291         boolean canIntercept(Object message);
2292     }
2293
2294     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2295         return new MessageInterceptor() {
2296             @Override
2297             public Object apply(final Object message) {
2298                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2299             }
2300
2301             @Override
2302             public boolean canIntercept(final Object message) {
2303                 return message instanceof FindPrimary;
2304             }
2305         };
2306     }
2307
2308     private static class MockRespondActor extends MessageCollectorActor {
2309         static final String CLEAR_RESPONSE = "clear-response";
2310
2311         private Object responseMsg;
2312         private final Class<?> requestClass;
2313
2314         @SuppressWarnings("unused")
2315         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2316             this.requestClass = requestClass;
2317             this.responseMsg = responseMsg;
2318         }
2319
2320         @Override
2321         public void onReceive(final Object message) throws Exception {
2322             if (message.equals(CLEAR_RESPONSE)) {
2323                 responseMsg = null;
2324             } else {
2325                 super.onReceive(message);
2326                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2327                     getSender().tell(responseMsg, getSelf());
2328                 }
2329             }
2330         }
2331     }
2332 }