2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.ArgumentMatchers.anyString;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSystem;
28 import akka.actor.AddressFromURIString;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Stopwatch;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.Lists;
49 import com.google.common.collect.Sets;
50 import com.google.common.util.concurrent.SettableFuture;
51 import com.google.common.util.concurrent.Uninterruptibles;
52 import java.time.Duration;
53 import java.util.AbstractMap;
54 import java.util.Arrays;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
60 import java.util.Map.Entry;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import java.util.function.Consumer;
66 import java.util.function.Function;
67 import java.util.stream.Collectors;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.runner.RunWith;
74 import org.mockito.junit.MockitoJUnitRunner;
75 import org.opendaylight.controller.cluster.access.concepts.MemberName;
76 import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest;
77 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
78 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
79 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
80 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
81 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
82 import org.opendaylight.controller.cluster.datastore.Shard;
83 import org.opendaylight.controller.cluster.datastore.config.Configuration;
84 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
85 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
86 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
87 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
88 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
89 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
90 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
91 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
92 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
93 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
94 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
95 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
96 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
97 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
98 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
99 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
100 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
102 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
103 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
104 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
105 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
106 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
107 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
108 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
109 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
110 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
111 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
112 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
113 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
114 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
115 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
116 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
117 import org.opendaylight.controller.cluster.raft.RaftState;
118 import org.opendaylight.controller.cluster.raft.TestActorFactory;
119 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
120 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
121 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
122 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
123 import org.opendaylight.controller.cluster.raft.messages.AddServer;
124 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
125 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
126 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
127 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
128 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
129 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
130 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
131 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
132 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
133 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
134 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
135 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
136 import org.opendaylight.yangtools.concepts.Registration;
137 import org.opendaylight.yangtools.yang.common.Empty;
138 import org.opendaylight.yangtools.yang.common.XMLNamespace;
139 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
140 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
141 import org.slf4j.Logger;
142 import org.slf4j.LoggerFactory;
143 import scala.concurrent.Await;
144 import scala.concurrent.Future;
145 import scala.concurrent.duration.FiniteDuration;
147 @RunWith(MockitoJUnitRunner.StrictStubs.class)
148 public class ShardManagerTest extends AbstractClusterRefActorTest {
149 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
150 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
151 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
152 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
154 private static int ID_COUNTER = 1;
155 private static ActorRef mockShardActor;
156 private static ShardIdentifier mockShardName;
157 private static SettableFuture<Empty> ready;
158 private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
160 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
161 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
162 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
163 .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
164 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
166 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
169 public static void beforeClass() {
170 TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
174 public static void afterClass() {
175 TEST_SCHEMA_CONTEXT = null;
179 public void setUp() {
180 ready = SettableFuture.create();
182 InMemoryJournal.clear();
183 InMemorySnapshotStore.clear();
185 if (mockShardActor == null) {
186 mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
187 mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString());
190 MessageCollectorActor.clearMessages(mockShardActor);
194 public void tearDown() {
195 InMemoryJournal.clear();
196 InMemorySnapshotStore.clear();
198 mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
199 await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated);
200 mockShardActor = null;
202 actorFactory.close();
205 private TestShardManager.Builder newTestShardMgrBuilder() {
206 return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
209 private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
210 return TestShardManager.builder(datastoreContextBuilder).configuration(config)
211 .distributedDataStore(mock(DistributedDataStore.class));
214 private Props newShardMgrProps() {
215 return newShardMgrProps(new MockConfiguration());
218 private Props newShardMgrProps(final Configuration config) {
219 return newTestShardMgrBuilder(config).readinessFuture(ready).props();
222 private ActorSystem newActorSystem(final String config) {
223 return newActorSystem("cluster-test", config);
226 private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
227 String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
228 if (system == getSystem()) {
229 return actorFactory.createActor(MessageCollectorActor.props(), name);
232 return system.actorOf(MessageCollectorActor.props(), name);
235 private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
236 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
237 doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
238 doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
242 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
243 return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
246 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
247 return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
248 .distributedDataStore(mock(DistributedDataStore.class));
252 private Props newPropsShardMgrWithMockShardActor() {
253 return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
254 Dispatchers.DefaultDispatcherId());
257 private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
258 return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
259 .withDispatcher(Dispatchers.DefaultDispatcherId());
263 private TestShardManager newTestShardManager() {
264 return newTestShardManager(newShardMgrProps());
267 private TestShardManager newTestShardManager(final Props props) {
268 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
269 TestShardManager shardManager = shardManagerActor.underlyingActor();
270 shardManager.waitForRecoveryComplete();
274 private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
276 AssertionError last = null;
277 Stopwatch sw = Stopwatch.createStarted();
278 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
280 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
281 kit.expectMsgClass(LocalShardFound.class);
283 } catch (AssertionError e) {
287 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
293 @SuppressWarnings("unchecked")
294 private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
295 Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
296 if (reply instanceof Failure) {
297 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
304 public void testPerShardDatastoreContext() throws Exception {
305 LOG.info("testPerShardDatastoreContext starting");
306 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
307 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
310 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
311 .when(mockFactory).getShardDatastoreContext("default");
314 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
315 .when(mockFactory).getShardDatastoreContext("topology");
317 final MockConfiguration mockConfig = new MockConfiguration() {
319 public Collection<String> getMemberShardNames(final MemberName memberName) {
320 return Arrays.asList("default", "topology");
324 public Collection<MemberName> getMembersFromShardName(final String shardName) {
325 return members("member-1");
329 final ActorRef defaultShardActor = actorFactory.createActor(
330 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
331 final ActorRef topologyShardActor = actorFactory.createActor(
332 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
334 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
335 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
336 shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
337 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
339 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
340 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
341 class LocalShardManager extends ShardManager {
342 LocalShardManager(final AbstractShardManagerCreator<?> creator) {
347 protected ActorRef newShardActor(final ShardInformation info) {
348 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
351 ref = entry.getKey();
352 entry.setValue(info.getDatastoreContext());
355 newShardActorLatch.countDown();
360 final Creator<ShardManager> creator = new Creator<>() {
361 private static final long serialVersionUID = 1L;
363 public ShardManager create() {
364 return new LocalShardManager(
365 new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
366 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
370 final TestKit kit = new TestKit(getSystem());
372 final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
373 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
375 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
377 assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
378 assertEquals("getShardElectionTimeoutFactor", 6,
379 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
380 assertEquals("getShardElectionTimeoutFactor", 7,
381 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
383 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
384 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
386 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
387 .when(newMockFactory).getShardDatastoreContext("default");
390 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
391 .when(newMockFactory).getShardDatastoreContext("topology");
393 shardManager.tell(newMockFactory, kit.getRef());
395 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
396 DatastoreContext.class);
397 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
399 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
400 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
402 LOG.info("testPerShardDatastoreContext ending");
406 public void testOnReceiveFindPrimaryForNonExistentShard() {
407 final TestKit kit = new TestKit(getSystem());
408 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
410 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
412 shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
414 kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
418 public void testOnReceiveFindPrimaryForLocalLeaderShard() {
419 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
420 final TestKit kit = new TestKit(getSystem());
421 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
423 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
425 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
426 shardManager.tell(new ActorInitialized(), mockShardActor);
428 DataTree mockDataTree = mock(DataTree.class);
429 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
430 DataStoreVersions.CURRENT_VERSION), kit.getRef());
432 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
434 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
437 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
439 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
440 LocalPrimaryShardFound.class);
441 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
442 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
443 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
445 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
449 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
450 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
451 final TestKit kit = new TestKit(getSystem());
452 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
454 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
455 shardManager.tell(new ActorInitialized(), mockShardActor);
457 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
458 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
460 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
462 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
465 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
467 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
469 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
473 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
474 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
475 final TestKit kit = new TestKit(getSystem());
476 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
478 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
479 shardManager.tell(new ActorInitialized(), mockShardActor);
481 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
482 MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
484 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
486 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
488 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
489 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
491 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
493 RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
494 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
495 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
496 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
498 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
502 public void testOnReceiveFindPrimaryForUninitializedShard() {
503 final TestKit kit = new TestKit(getSystem());
504 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
506 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
508 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
512 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
513 final TestKit kit = new TestKit(getSystem());
514 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
516 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
517 shardManager.tell(new ActorInitialized(), mockShardActor);
519 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
521 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
525 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
526 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
527 final TestKit kit = new TestKit(getSystem());
528 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
530 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
531 shardManager.tell(new ActorInitialized(), mockShardActor);
533 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
535 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
538 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
540 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
542 DataTree mockDataTree = mock(DataTree.class);
543 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
544 DataStoreVersions.CURRENT_VERSION), mockShardActor);
546 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
548 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
549 LocalPrimaryShardFound.class);
550 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
551 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
552 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
554 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
558 public void testOnReceiveFindPrimaryWaitForShardLeader() {
559 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
560 datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
561 final TestKit kit = new TestKit(getSystem());
562 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
564 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
566 // We're passing waitUntilInitialized = true to FindPrimary so
567 // the response should be
568 // delayed until we send ActorInitialized and
569 // RoleChangeNotification.
570 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
572 kit.expectNoMessage(Duration.ofMillis(150));
574 shardManager.tell(new ActorInitialized(), mockShardActor);
576 kit.expectNoMessage(Duration.ofMillis(150));
578 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
580 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
583 kit.expectNoMessage(Duration.ofMillis(150));
585 DataTree mockDataTree = mock(DataTree.class);
586 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
587 DataStoreVersions.CURRENT_VERSION), mockShardActor);
589 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
590 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
591 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
592 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
594 kit.expectNoMessage(Duration.ofMillis(200));
596 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
600 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
601 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
602 final TestKit kit = new TestKit(getSystem());
603 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
605 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
607 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
609 kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
611 shardManager.tell(new ActorInitialized(), mockShardActor);
613 kit.expectNoMessage(Duration.ofMillis(200));
615 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
619 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
620 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
621 final TestKit kit = new TestKit(getSystem());
622 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
624 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
625 shardManager.tell(new ActorInitialized(), mockShardActor);
626 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
627 RaftState.Candidate.name()), mockShardActor);
629 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
631 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
633 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
637 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
638 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
639 final TestKit kit = new TestKit(getSystem());
640 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
642 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
643 shardManager.tell(new ActorInitialized(), mockShardActor);
644 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
645 RaftState.IsolatedLeader.name()), mockShardActor);
647 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
649 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
651 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
655 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
656 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
657 final TestKit kit = new TestKit(getSystem());
658 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
660 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
661 shardManager.tell(new ActorInitialized(), mockShardActor);
663 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
665 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
667 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
671 public void testOnReceiveFindPrimaryForRemoteShard() {
672 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
673 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
675 // Create an ActorSystem ShardManager actor for member-1.
677 final ActorSystem system1 = newActorSystem("Member1");
678 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
680 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
681 newTestShardMgrBuilderWithMockShardActor().cluster(
682 new ClusterWrapperImpl(system1)).props().withDispatcher(
683 Dispatchers.DefaultDispatcherId()), shardManagerID);
685 // Create an ActorSystem ShardManager actor for member-2.
687 final ActorSystem system2 = newActorSystem("Member2");
689 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
691 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
693 MockConfiguration mockConfig2 = new MockConfiguration(
694 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
695 .put("astronauts", Arrays.asList("member-2")).build());
697 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
698 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
699 new ClusterWrapperImpl(system2)).props().withDispatcher(
700 Dispatchers.DefaultDispatcherId()), shardManagerID);
702 final TestKit kit = new TestKit(system1);
703 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
704 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
706 shardManager2.tell(new ActorInitialized(), mockShardActor2);
708 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
709 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
710 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
712 shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
715 shardManager1.underlyingActor().waitForMemberUp();
716 shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
718 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
719 String path = found.getPrimaryPath();
720 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
721 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
723 shardManager2.underlyingActor().verifyFindPrimary();
725 // This part times out quite a bit on jenkins for some reason
727 // Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
729 // shardManager1.underlyingActor().waitForMemberRemoved();
731 // shardManager1.tell(new FindPrimary("astronauts", false), getRef());
733 // expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
735 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
739 public void testShardAvailabilityOnChangeOfMemberReachability() {
740 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
741 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
743 // Create an ActorSystem ShardManager actor for member-1.
745 final ActorSystem system1 = newActorSystem("Member1");
746 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
748 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
750 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
751 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
752 new ClusterWrapperImpl(system1)).props().withDispatcher(
753 Dispatchers.DefaultDispatcherId()), shardManagerID);
755 // Create an ActorSystem ShardManager actor for member-2.
757 final ActorSystem system2 = newActorSystem("Member2");
759 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
761 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
763 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
764 .put("default", Arrays.asList("member-1", "member-2")).build());
766 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
767 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
768 new ClusterWrapperImpl(system2)).props().withDispatcher(
769 Dispatchers.DefaultDispatcherId()), shardManagerID);
771 final TestKit kit = new TestKit(system1);
772 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
773 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
774 shardManager1.tell(new ActorInitialized(), mockShardActor1);
775 shardManager2.tell(new ActorInitialized(), mockShardActor2);
777 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
778 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
779 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
780 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
782 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
784 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
785 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
787 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
789 shardManager1.underlyingActor().waitForMemberUp();
791 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
793 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
794 String path = found.getPrimaryPath();
795 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
797 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
800 shardManager1.underlyingActor().waitForUnreachableMember();
801 MessageCollectorActor.clearMessages(mockShardActor1);
803 shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
806 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
808 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
810 shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
813 shardManager1.underlyingActor().waitForReachableMember();
815 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
817 RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
818 String path1 = found1.getPrimaryPath();
819 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
821 shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
824 // Test FindPrimary wait succeeds after reachable member event.
826 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
827 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
828 shardManager1.underlyingActor().waitForUnreachableMember();
830 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
833 MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
835 RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
836 String path2 = found2.getPrimaryPath();
837 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
839 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
843 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
844 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
845 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
847 // Create an ActorSystem ShardManager actor for member-1.
849 final ActorSystem system1 = newActorSystem("Member1");
850 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
852 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
854 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
855 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
856 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
857 .primaryShardInfoCache(primaryShardInfoCache).props()
858 .withDispatcher(Dispatchers.DefaultDispatcherId()),
861 // Create an ActorSystem ShardManager actor for member-2.
863 final ActorSystem system2 = newActorSystem("Member2");
865 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
867 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
869 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
870 .put("default", Arrays.asList("member-1", "member-2")).build());
872 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
873 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
874 new ClusterWrapperImpl(system2)).props().withDispatcher(
875 Dispatchers.DefaultDispatcherId()), shardManagerID);
877 final TestKit kit = new TestKit(system1);
878 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
879 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
880 shardManager1.tell(new ActorInitialized(), mockShardActor1);
881 shardManager2.tell(new ActorInitialized(), mockShardActor2);
883 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
884 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
885 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
886 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
888 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
890 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
891 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
893 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
895 shardManager1.underlyingActor().waitForMemberUp();
897 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
899 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
900 String path = found.getPrimaryPath();
901 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
903 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
904 system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
906 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
907 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
909 shardManager1.underlyingActor().waitForUnreachableMember();
911 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
913 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
915 assertNull("Expected primaryShardInfoCache entry removed",
916 primaryShardInfoCache.getIfPresent("default"));
918 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
919 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
921 new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
924 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
926 LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
927 String path1 = found1.getPrimaryPath();
928 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
930 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
934 public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
935 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
936 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
938 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
939 .put("default", Arrays.asList("member-256", "member-2")).build());
941 // Create an ActorSystem, ShardManager and actor for member-256.
943 final ActorSystem system256 = newActorSystem("Member256");
944 // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
945 Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
947 final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
949 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
951 // ShardManager must be created with shard configuration to let its localShards has shards.
952 final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
953 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
954 .cluster(new ClusterWrapperImpl(system256))
955 .primaryShardInfoCache(primaryShardInfoCache).props()
956 .withDispatcher(Dispatchers.DefaultDispatcherId()),
959 // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
961 final ActorSystem system2 = newActorSystem("Member2");
963 // Join member-2 into the cluster of member-256.
964 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
966 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
968 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
969 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
970 new ClusterWrapperImpl(system2)).props().withDispatcher(
971 Dispatchers.DefaultDispatcherId()), shardManagerID);
973 final TestKit kit256 = new TestKit(system256);
974 shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
975 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
976 shardManager256.tell(new ActorInitialized(), mockShardActor256);
977 shardManager2.tell(new ActorInitialized(), mockShardActor2);
979 String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
980 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
981 shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
982 DataStoreVersions.CURRENT_VERSION), mockShardActor256);
983 shardManager256.tell(
984 new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
986 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
987 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
989 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
991 shardManager256.underlyingActor().waitForMemberUp();
993 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
995 LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
996 String path = found.getPrimaryPath();
997 assertTrue("Unexpected primary path " + path + " which must on member-256",
998 path.contains("member-256-shard-default-config"));
1000 PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
1001 system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
1002 primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
1004 // Simulate member-2 become unreachable.
1005 shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
1006 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
1007 shardManager256.underlyingActor().waitForUnreachableMember();
1009 // Make sure leader shard on member-256 is still leader and still in the cache.
1010 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
1011 found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
1012 path = found.getPrimaryPath();
1013 assertTrue("Unexpected primary path " + path + " which must still not on member-256",
1014 path.contains("member-256-shard-default-config"));
1015 Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
1016 futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
1018 public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
1019 if (failure != null) {
1020 assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
1022 assertEquals("Expected primaryShardInfoCache entry",
1023 primaryShardInfo, futurePrimaryShardInfo);
1026 }, system256.dispatchers().defaultGlobalDispatcher());
1028 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
1032 public void testOnReceiveFindLocalShardForNonExistentShard() {
1033 final TestKit kit = new TestKit(getSystem());
1034 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1036 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1038 shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
1040 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1042 assertEquals("getShardName", "non-existent", notFound.getShardName());
1046 public void testOnReceiveFindLocalShardForExistentShard() {
1047 final TestKit kit = new TestKit(getSystem());
1048 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1050 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1051 shardManager.tell(new ActorInitialized(), mockShardActor);
1053 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1055 LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1057 assertTrue("Found path contains " + found.getPath().path().toString(),
1058 found.getPath().path().toString().contains("member-1-shard-default-config"));
1062 public void testOnReceiveFindLocalShardForNotInitializedShard() {
1063 final TestKit kit = new TestKit(getSystem());
1064 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1066 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1068 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1072 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1073 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1074 final TestKit kit = new TestKit(getSystem());
1075 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1077 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1079 // We're passing waitUntilInitialized = true to FindLocalShard
1080 // so the response should be
1081 // delayed until we send ActorInitialized.
1082 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1083 new Timeout(5, TimeUnit.SECONDS));
1085 shardManager.tell(new ActorInitialized(), mockShardActor);
1087 Object resp = Await.result(future, kit.duration("5 seconds"));
1088 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1090 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1094 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1095 TestShardManager shardManager = newTestShardManager();
1097 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1098 shardManager.handleCommand(new RoleChangeNotification(
1099 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1100 assertFalse(ready.isDone());
1102 shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1103 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1104 assertTrue(ready.isDone());
1108 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1109 final TestKit kit = new TestKit(getSystem());
1110 TestShardManager shardManager = newTestShardManager();
1112 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1113 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1114 assertFalse(ready.isDone());
1116 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1118 shardManager.handleCommand(
1119 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1120 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1121 assertTrue(ready.isDone());
1125 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1126 final TestKit kit = new TestKit(getSystem());
1127 TestShardManager shardManager = newTestShardManager();
1129 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1130 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1131 assertFalse(ready.isDone());
1133 shardManager.handleCommand(
1134 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1135 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1137 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1138 assertTrue(ready.isDone());
1142 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1143 TestShardManager shardManager = newTestShardManager();
1145 shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1146 RaftState.Leader.name()));
1147 assertFalse(ready.isDone());
1151 public void testByDefaultSyncStatusIsFalse() {
1152 TestShardManager shardManager = newTestShardManager();
1154 assertFalse(shardManager.getMBean().getSyncStatus());
1158 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1159 TestShardManager shardManager = newTestShardManager();
1161 shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1162 RaftState.Follower.name(), RaftState.Leader.name()));
1164 assertTrue(shardManager.getMBean().getSyncStatus());
1168 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1169 TestShardManager shardManager = newTestShardManager();
1171 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1172 shardManager.handleCommand(new RoleChangeNotification(shardId,
1173 RaftState.Follower.name(), RaftState.Candidate.name()));
1175 assertFalse(shardManager.getMBean().getSyncStatus());
1177 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1178 shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1181 assertFalse(shardManager.getMBean().getSyncStatus());
1185 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1186 TestShardManager shardManager = newTestShardManager();
1188 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1189 shardManager.handleCommand(new RoleChangeNotification(shardId,
1190 RaftState.Candidate.name(), RaftState.Follower.name()));
1192 // Initially will be false
1193 assertFalse(shardManager.getMBean().getSyncStatus());
1195 // Send status true will make sync status true
1196 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1198 assertTrue(shardManager.getMBean().getSyncStatus());
1200 // Send status false will make sync status false
1201 shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1203 assertFalse(shardManager.getMBean().getSyncStatus());
1207 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1208 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1209 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1211 public List<String> getMemberShardNames(final MemberName memberName) {
1212 return Arrays.asList("default", "astronauts");
1216 // Initially will be false
1217 assertFalse(shardManager.getMBean().getSyncStatus());
1219 // Make default shard leader
1220 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1221 shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1222 RaftState.Follower.name(), RaftState.Leader.name()));
1224 // default = Leader, astronauts is unknown so sync status remains false
1225 assertFalse(shardManager.getMBean().getSyncStatus());
1227 // Make astronauts shard leader as well
1228 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1229 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1230 RaftState.Follower.name(), RaftState.Leader.name()));
1232 // Now sync status should be true
1233 assertTrue(shardManager.getMBean().getSyncStatus());
1235 // Make astronauts a Follower
1236 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1237 RaftState.Leader.name(), RaftState.Follower.name()));
1239 // Sync status is not true
1240 assertFalse(shardManager.getMBean().getSyncStatus());
1242 // Make the astronauts follower sync status true
1243 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1245 // Sync status is now true
1246 assertTrue(shardManager.getMBean().getSyncStatus());
1248 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1252 public void testOnReceiveSwitchShardBehavior() {
1253 final TestKit kit = new TestKit(getSystem());
1254 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1256 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1257 shardManager.tell(new ActorInitialized(), mockShardActor);
1259 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1261 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1262 SwitchBehavior.class);
1264 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1265 assertEquals(1000, switchBehavior.getNewTerm());
1268 private static List<MemberName> members(final String... names) {
1269 return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1273 public void testOnCreateShard() {
1274 LOG.info("testOnCreateShard starting");
1275 final TestKit kit = new TestKit(getSystem());
1276 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1278 ActorRef shardManager = actorFactory
1279 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1280 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1282 EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1283 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1285 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1286 .persistent(false).build();
1287 Shard.Builder shardBuilder = Shard.builder();
1289 ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1290 "foo", null, members("member-1", "member-5", "member-6"));
1291 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1293 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1295 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1297 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1299 assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1300 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1301 .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1302 assertEquals("peerMembers", Sets.newHashSet(
1303 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1304 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1305 shardBuilder.getPeerAddresses().keySet());
1306 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1307 shardBuilder.getId());
1308 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1310 // Send CreateShard with same name - should return Success with
1313 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1315 Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1316 assertNotNull("Success status is null", success.status());
1318 LOG.info("testOnCreateShard ending");
1322 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1323 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1324 final TestKit kit = new TestKit(getSystem());
1325 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1327 ActorRef shardManager = actorFactory
1328 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1329 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1331 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1333 Shard.Builder shardBuilder = Shard.builder();
1334 ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1335 "foo", null, members("member-5", "member-6"));
1337 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1338 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1340 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1341 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1343 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1344 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1345 .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1347 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1351 public void testOnCreateShardWithNoInitialSchemaContext() {
1352 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1353 final TestKit kit = new TestKit(getSystem());
1354 ActorRef shardManager = actorFactory
1355 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1356 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1358 Shard.Builder shardBuilder = Shard.builder();
1360 ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1361 "foo", null, members("member-1"));
1362 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1364 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1366 EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1367 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1369 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1371 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1373 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1374 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1376 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1380 public void testGetSnapshot() {
1381 LOG.info("testGetSnapshot starting");
1382 TestKit kit = new TestKit(getSystem());
1384 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1385 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1386 .put("astronauts", Collections.<String>emptyList()).build());
1388 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1389 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1391 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1392 Failure failure = kit.expectMsgClass(Failure.class);
1393 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1395 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1397 waitForShardInitialized(shardManager, "shard1", kit);
1398 waitForShardInitialized(shardManager, "shard2", kit);
1400 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1402 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1404 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1405 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1407 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1408 datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1410 // Add a new replica
1412 TestKit mockShardLeaderKit = new TestKit(getSystem());
1414 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1415 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1417 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1418 mockShardLeaderKit.expectMsgClass(AddServer.class);
1419 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1420 kit.expectMsgClass(Status.Success.class);
1421 waitForShardInitialized(shardManager, "astronauts", kit);
1423 // Send another GetSnapshot and verify
1425 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1426 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1428 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1429 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1431 ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1432 assertNotNull("Expected ShardManagerSnapshot", snapshot);
1433 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1434 Sets.newHashSet(snapshot.getShardList()));
1436 LOG.info("testGetSnapshot ending");
1440 public void testRestoreFromSnapshot() {
1441 LOG.info("testRestoreFromSnapshot starting");
1443 datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1445 TestKit kit = new TestKit(getSystem());
1447 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1448 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1449 .put("astronauts", Collections.<String>emptyList()).build());
1451 ShardManagerSnapshot snapshot =
1452 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1453 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1454 Collections.<ShardSnapshot>emptyList());
1455 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1456 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1458 shardManager.underlyingActor().waitForRecoveryComplete();
1460 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1462 waitForShardInitialized(shardManager, "shard1", kit);
1463 waitForShardInitialized(shardManager, "shard2", kit);
1464 waitForShardInitialized(shardManager, "astronauts", kit);
1466 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1468 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1470 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1472 assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1473 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1474 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1476 LOG.info("testRestoreFromSnapshot ending");
1480 public void testAddShardReplicaForNonExistentShardConfig() {
1481 final TestKit kit = new TestKit(getSystem());
1482 ActorRef shardManager = actorFactory
1483 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1484 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1486 shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1487 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1489 assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1493 public void testAddShardReplica() {
1494 LOG.info("testAddShardReplica starting");
1495 MockConfiguration mockConfig = new MockConfiguration(
1496 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1497 .put("astronauts", Arrays.asList("member-2")).build());
1499 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1500 datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1502 // Create an ActorSystem ShardManager actor for member-1.
1503 final ActorSystem system1 = newActorSystem("Member1");
1504 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1505 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1506 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1507 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1508 .cluster(new ClusterWrapperImpl(system1)).props()
1509 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1512 // Create an ActorSystem ShardManager actor for member-2.
1513 final ActorSystem system2 = newActorSystem("Member2");
1514 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1516 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1517 String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1518 final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1519 Props.create(MockRespondActor.class, AddServer.class,
1520 new AddServerReply(ServerChangeStatus.OK, memberId2))
1521 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1523 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1524 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1525 .cluster(new ClusterWrapperImpl(system2)).props()
1526 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1529 final TestKit kit = new TestKit(getSystem());
1530 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1531 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1533 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1535 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1536 leaderShardManager.tell(
1537 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1538 mockShardLeaderActor);
1539 leaderShardManager.tell(
1540 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1541 mockShardLeaderActor);
1543 newReplicaShardManager.underlyingActor().waitForMemberUp();
1544 leaderShardManager.underlyingActor().waitForMemberUp();
1546 // Have a dummy snapshot to be overwritten by the new data
1548 String[] restoredShards = { "default", "people" };
1549 ShardManagerSnapshot snapshot =
1550 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1551 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1552 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1554 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1555 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1557 // construct a mock response message
1558 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1559 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1561 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1562 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1563 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1565 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1566 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1567 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1568 ShardManagerSnapshot.class);
1569 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1570 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1571 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1572 Sets.newHashSet(shardManagerSnapshot.getShardList()));
1573 LOG.info("testAddShardReplica ending");
1577 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1578 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1579 final TestKit kit = new TestKit(getSystem());
1580 TestActorRef<TestShardManager> shardManager = actorFactory
1581 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1583 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1584 shardManager.tell(new ActorInitialized(), mockShardActor);
1586 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1587 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1588 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1589 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1591 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1593 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1595 new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1598 new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1601 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1603 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1605 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1606 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1608 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1609 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1611 // Send message again to verify previous in progress state is
1614 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1615 resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1616 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1618 // Send message again with an AddServer timeout to verify the
1619 // pre-existing shard actor isn't terminated.
1622 newDatastoreContextFactory(
1623 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1624 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1625 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1626 kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1628 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1629 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1631 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1635 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1636 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1637 final TestKit kit = new TestKit(getSystem());
1638 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1639 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1641 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1642 shardManager.tell(new ActorInitialized(), mockShardActor);
1643 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1644 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1646 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1649 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1650 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1651 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1653 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1654 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1656 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1660 public void testAddShardReplicaWithAddServerReplyFailure() {
1661 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1662 final TestKit kit = new TestKit(getSystem());
1663 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1665 MockConfiguration mockConfig = new MockConfiguration(
1666 ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1668 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1669 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1670 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1671 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1672 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1674 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1676 TestKit terminateWatcher = new TestKit(getSystem());
1677 terminateWatcher.watch(mockNewReplicaShardActor);
1679 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1681 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1682 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1683 addServerMsg.getNewServerId());
1684 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1686 Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1687 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1689 shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1690 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1692 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1694 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1695 mockShardLeaderKit.expectMsgClass(AddServer.class);
1696 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1697 failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1698 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1700 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1704 public void testAddShardReplicaWithAlreadyInProgress() {
1705 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1706 AddServer.class, new AddShardReplica("astronauts"));
1710 public void testAddShardReplicaWithFindPrimaryTimeout() {
1711 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1712 datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1713 final TestKit kit = new TestKit(getSystem());
1714 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1716 final ActorRef newReplicaShardManager = actorFactory
1717 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1718 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1720 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1721 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1722 AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1724 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1725 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1726 assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1728 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1732 public void testRemoveShardReplicaForNonExistentShard() {
1733 final TestKit kit = new TestKit(getSystem());
1734 ActorRef shardManager = actorFactory
1735 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1736 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1738 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1739 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1740 assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1747 public void testRemoveShardReplicaLocal() {
1748 final TestKit kit = new TestKit(getSystem());
1749 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1751 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1752 RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1754 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1756 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1757 shardManager.tell(new ActorInitialized(), respondActor);
1758 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1759 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1761 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1764 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1765 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1766 RemoveServer.class);
1767 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1768 removeServer.getServerId());
1769 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1773 public void testRemoveShardReplicaRemote() {
1774 MockConfiguration mockConfig = new MockConfiguration(
1775 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1776 .put("astronauts", Arrays.asList("member-1")).build());
1778 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1780 // Create an ActorSystem ShardManager actor for member-1.
1781 final ActorSystem system1 = newActorSystem("Member1");
1782 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1783 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1785 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1786 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1787 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1790 // Create an ActorSystem ShardManager actor for member-2.
1791 final ActorSystem system2 = newActorSystem("Member2");
1792 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1794 String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1795 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1796 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1797 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1798 new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1800 LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1802 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1803 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1804 new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1807 // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1808 // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1809 // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1810 // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1812 // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1813 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1814 // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1816 // To work around this problem we create a ForwardingActor with the right address and pass to it the
1817 // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1818 // thing works as expected
1819 final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1820 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1821 "member-2-shard-default-" + shardMrgIDSuffix);
1823 LOG.error("Forwarding actor : {}", actorRef);
1825 final TestKit kit = new TestKit(getSystem());
1826 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1827 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1829 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1830 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1832 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1833 leaderShardManager.tell(
1834 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1835 mockShardLeaderActor);
1836 leaderShardManager.tell(
1837 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1838 mockShardLeaderActor);
1840 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1841 newReplicaShardManager.tell(
1842 new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1844 newReplicaShardManager.tell(
1845 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1848 newReplicaShardManager.underlyingActor().waitForMemberUp();
1849 leaderShardManager.underlyingActor().waitForMemberUp();
1851 // construct a mock response message
1852 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1853 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1854 RemoveServer.class);
1855 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1856 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1857 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1861 public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1862 testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1863 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1867 public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1868 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1869 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1873 public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1874 final Class<?> firstForwardedServerChangeClass,
1875 final Object secondServerChange) {
1876 final TestKit kit = new TestKit(getSystem());
1877 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1878 final TestKit secondRequestKit = new TestKit(getSystem());
1880 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1881 .put(shardName, Arrays.asList("member-2")).build());
1883 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1884 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1885 .cluster(new MockClusterWrapper()).props()
1886 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1889 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1891 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1893 shardManager.tell(firstServerChange, kit.getRef());
1895 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1897 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1899 secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1903 public void testServerRemovedShardActorNotRunning() {
1904 LOG.info("testServerRemovedShardActorNotRunning starting");
1905 final TestKit kit = new TestKit(getSystem());
1906 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1907 .put("default", Arrays.asList("member-1", "member-2"))
1908 .put("astronauts", Arrays.asList("member-2"))
1909 .put("people", Arrays.asList("member-1", "member-2")).build());
1911 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1912 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1914 shardManager.underlyingActor().waitForRecoveryComplete();
1915 shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1916 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1918 shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1919 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1921 // Removed the default shard replica from member-1
1922 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1923 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1925 shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1927 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1929 LOG.info("testServerRemovedShardActorNotRunning ending");
1933 public void testServerRemovedShardActorRunning() {
1934 LOG.info("testServerRemovedShardActorRunning starting");
1935 final TestKit kit = new TestKit(getSystem());
1936 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1937 .put("default", Arrays.asList("member-1", "member-2"))
1938 .put("astronauts", Arrays.asList("member-2"))
1939 .put("people", Arrays.asList("member-1", "member-2")).build());
1941 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1942 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1944 TestActorRef<TestShardManager> shardManager = actorFactory
1945 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1946 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1948 shardManager.underlyingActor().waitForRecoveryComplete();
1950 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1951 shardManager.tell(new ActorInitialized(), shard);
1953 waitForShardInitialized(shardManager, "people", kit);
1954 waitForShardInitialized(shardManager, "default", kit);
1956 // Removed the default shard replica from member-1
1957 shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1959 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1961 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1963 LOG.info("testServerRemovedShardActorRunning ending");
1967 public void testShardPersistenceWithRestoredData() {
1968 LOG.info("testShardPersistenceWithRestoredData starting");
1969 final TestKit kit = new TestKit(getSystem());
1970 MockConfiguration mockConfig =
1971 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1972 .put("default", Arrays.asList("member-1", "member-2"))
1973 .put("astronauts", Arrays.asList("member-2"))
1974 .put("people", Arrays.asList("member-1", "member-2")).build());
1975 String[] restoredShards = {"default", "astronauts"};
1976 ShardManagerSnapshot snapshot =
1977 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1978 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1980 // create shardManager to come up with restored data
1981 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1982 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1984 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1986 newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1987 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1988 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1990 // Verify a local shard is created for the restored shards,
1991 // although we expect a NotInitializedException for the shards
1992 // as the actor initialization
1993 // message is not sent for them
1994 newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1995 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1997 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1998 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
2000 LOG.info("testShardPersistenceWithRestoredData ending");
2004 public void testShutDown() throws Exception {
2005 LOG.info("testShutDown starting");
2006 final TestKit kit = new TestKit(getSystem());
2007 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2008 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2010 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2011 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
2013 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2014 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
2016 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
2017 .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
2019 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2020 shardManager.tell(new ActorInitialized(), shard1);
2021 shardManager.tell(new ActorInitialized(), shard2);
2023 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2024 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2026 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2027 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2030 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2031 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2032 } catch (TimeoutException e) {
2036 actorFactory.killActor(shard1, kit);
2037 actorFactory.killActor(shard2, kit);
2039 Boolean stopped = Await.result(stopFuture, duration);
2040 assertEquals("Stopped", Boolean.TRUE, stopped);
2042 LOG.info("testShutDown ending");
2046 public void testChangeServersVotingStatus() {
2047 final TestKit kit = new TestKit(getSystem());
2048 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2050 ActorRef respondActor = actorFactory
2051 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2052 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2054 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2056 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2057 shardManager.tell(new ActorInitialized(), respondActor);
2058 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2059 DataStoreVersions.CURRENT_VERSION), kit.getRef());
2061 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2065 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2067 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2068 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2069 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2070 ImmutableMap.of(ShardIdentifier
2071 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2074 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2078 public void testChangeServersVotingStatusWithNoLeader() {
2079 final TestKit kit = new TestKit(getSystem());
2080 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2082 ActorRef respondActor = actorFactory
2083 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2084 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2086 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2088 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2089 shardManager.tell(new ActorInitialized(), respondActor);
2090 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2093 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2095 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2097 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2098 assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2101 @SuppressWarnings("unchecked")
2103 public void testRegisterForShardLeaderChanges() {
2104 LOG.info("testRegisterForShardLeaderChanges starting");
2106 final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2107 final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2108 final TestKit kit = new TestKit(getSystem());
2109 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2111 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2112 shardManager.tell(new ActorInitialized(), mockShardActor);
2114 final Consumer<String> mockCallback = mock(Consumer.class);
2115 shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2117 final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2118 final Registration reg = (Registration) reply.status();
2120 final DataTree mockDataTree = mock(DataTree.class);
2121 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2122 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2124 verify(mockCallback, timeout(5000)).accept("default");
2126 reset(mockCallback);
2127 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2128 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2130 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2131 verifyNoMoreInteractions(mockCallback);
2133 shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2134 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2136 verify(mockCallback, timeout(5000)).accept("default");
2138 reset(mockCallback);
2139 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2140 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2142 verify(mockCallback, timeout(5000)).accept("default");
2144 reset(mockCallback);
2147 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2148 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2150 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2151 verifyNoMoreInteractions(mockCallback);
2153 LOG.info("testRegisterForShardLeaderChanges ending");
2156 public static class TestShardManager extends ShardManager {
2157 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2158 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2159 private ShardManagerSnapshot snapshot;
2160 private final Map<String, ActorRef> shardActors;
2161 private final ActorRef shardActor;
2162 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2163 private CountDownLatch memberUpReceived = new CountDownLatch(1);
2164 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2165 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2166 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2167 private volatile MessageInterceptor messageInterceptor;
2169 TestShardManager(final Builder builder) {
2171 shardActor = builder.shardActor;
2172 shardActors = builder.shardActors;
2176 protected void handleRecover(final Object message) throws Exception {
2178 super.handleRecover(message);
2180 if (message instanceof RecoveryCompleted) {
2181 recoveryComplete.countDown();
2186 private void countDownIfOther(final Member member, final CountDownLatch latch) {
2187 if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2193 public void handleCommand(final Object message) throws Exception {
2195 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2196 getSender().tell(messageInterceptor.apply(message), getSelf());
2198 super.handleCommand(message);
2201 if (message instanceof FindPrimary) {
2202 findPrimaryMessageReceived.countDown();
2203 } else if (message instanceof ClusterEvent.MemberUp) {
2204 countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2205 } else if (message instanceof ClusterEvent.MemberRemoved) {
2206 countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2207 } else if (message instanceof ClusterEvent.UnreachableMember) {
2208 countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2209 } else if (message instanceof ClusterEvent.ReachableMember) {
2210 countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2215 void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2216 this.messageInterceptor = messageInterceptor;
2219 void waitForRecoveryComplete() {
2220 assertTrue("Recovery complete",
2221 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2224 public void waitForMemberUp() {
2225 assertTrue("MemberUp received",
2226 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2227 memberUpReceived = new CountDownLatch(1);
2230 void waitForMemberRemoved() {
2231 assertTrue("MemberRemoved received",
2232 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2233 memberRemovedReceived = new CountDownLatch(1);
2236 void waitForUnreachableMember() {
2237 assertTrue("UnreachableMember received",
2238 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2239 memberUnreachableReceived = new CountDownLatch(1);
2242 void waitForReachableMember() {
2243 assertTrue("ReachableMember received",
2244 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2245 memberReachableReceived = new CountDownLatch(1);
2248 void verifyFindPrimary() {
2249 assertTrue("FindPrimary received",
2250 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2251 findPrimaryMessageReceived = new CountDownLatch(1);
2254 public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2255 return new Builder(datastoreContextBuilder);
2258 public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2259 private ActorRef shardActor;
2260 private final Map<String, ActorRef> shardActors = new HashMap<>();
2262 Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2263 super(TestShardManager.class);
2264 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2267 Builder shardActor(final ActorRef newShardActor) {
2268 shardActor = newShardActor;
2272 Builder addShardActor(final String shardName, final ActorRef actorRef) {
2273 shardActors.put(shardName, actorRef);
2279 public void saveSnapshot(final Object obj) {
2280 snapshot = (ShardManagerSnapshot) obj;
2281 snapshotPersist.countDown();
2282 super.saveSnapshot(obj);
2285 void verifySnapshotPersisted(final Set<String> shardList) {
2286 assertTrue("saveSnapshot invoked",
2287 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2288 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2292 protected ActorRef newShardActor(final ShardInformation info) {
2293 if (shardActors.get(info.getShardName()) != null) {
2294 return shardActors.get(info.getShardName());
2297 if (shardActor != null) {
2301 return super.newShardActor(info);
2305 private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2306 extends AbstractShardManagerCreator<T> {
2307 private final Class<C> shardManagerClass;
2309 AbstractGenericCreator(final Class<C> shardManagerClass) {
2310 this.shardManagerClass = shardManagerClass;
2311 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
2312 .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2316 public Props props() {
2318 return Props.create(shardManagerClass, this);
2322 private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2323 GenericCreator(final Class<C> shardManagerClass) {
2324 super(shardManagerClass);
2328 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2329 private static final long serialVersionUID = 1L;
2330 private final Creator<ShardManager> delegate;
2332 DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2333 this.delegate = delegate;
2337 public ShardManager create() throws Exception {
2338 return delegate.create();
2342 interface MessageInterceptor extends Function<Object, Object> {
2343 boolean canIntercept(Object message);
2346 private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2347 return new MessageInterceptor() {
2349 public Object apply(final Object message) {
2350 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2354 public boolean canIntercept(final Object message) {
2355 return message instanceof FindPrimary;
2360 private static class MockRespondActor extends MessageCollectorActor {
2361 static final String CLEAR_RESPONSE = "clear-response";
2363 private Object responseMsg;
2364 private final Class<?> requestClass;
2366 @SuppressWarnings("unused")
2367 MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2368 this.requestClass = requestClass;
2369 this.responseMsg = responseMsg;
2373 public void onReceive(final Object message) throws Exception {
2374 if (message.equals(CLEAR_RESPONSE)) {
2377 super.onReceive(message);
2378 if (message.getClass().equals(requestClass) && responseMsg != null) {
2379 getSender().tell(responseMsg, getSelf());