2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.ArgumentMatchers.anyString;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSystem;
28 import akka.actor.AddressFromURIString;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Stopwatch;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.Lists;
49 import com.google.common.collect.Sets;
50 import com.google.common.util.concurrent.SettableFuture;
51 import com.google.common.util.concurrent.Uninterruptibles;
52 import java.time.Duration;
53 import java.util.AbstractMap;
54 import java.util.Arrays;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
60 import java.util.Map.Entry;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import java.util.function.Consumer;
66 import java.util.function.Function;
67 import java.util.stream.Collectors;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.runner.RunWith;
74 import org.mockito.junit.MockitoJUnitRunner;
75 import org.opendaylight.controller.cluster.access.concepts.MemberName;
76 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
77 import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest;
78 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
79 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
80 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
81 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
82 import org.opendaylight.controller.cluster.datastore.Shard;
83 import org.opendaylight.controller.cluster.datastore.config.Configuration;
84 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
85 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
86 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
87 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
88 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
89 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
90 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
91 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
92 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
93 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
94 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
95 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
96 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
97 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
98 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
99 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
100 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
102 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
103 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
104 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
105 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
106 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
107 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
108 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
109 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
110 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
111 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
112 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
113 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
114 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
115 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
116 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
117 import org.opendaylight.controller.cluster.raft.RaftState;
118 import org.opendaylight.controller.cluster.raft.TestActorFactory;
119 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
120 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
121 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
122 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
123 import org.opendaylight.controller.cluster.raft.messages.AddServer;
124 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
125 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
126 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
127 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
128 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
129 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
130 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
131 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
132 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
133 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
134 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
135 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
136 import org.opendaylight.yangtools.concepts.Registration;
137 import org.opendaylight.yangtools.yang.common.Empty;
138 import org.opendaylight.yangtools.yang.common.XMLNamespace;
139 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
140 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
141 import org.slf4j.Logger;
142 import org.slf4j.LoggerFactory;
143 import scala.concurrent.Await;
144 import scala.concurrent.Future;
145 import scala.concurrent.duration.FiniteDuration;
147 @RunWith(MockitoJUnitRunner.StrictStubs.class)
148 public class ShardManagerTest extends AbstractClusterRefActorTest {
149 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
150 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
151 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
152 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
154 private static int ID_COUNTER = 1;
155 private static ActorRef mockShardActor;
156 private static ShardIdentifier mockShardName;
157 private static SettableFuture<Empty> ready;
158 private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
160 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
161 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
162 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
163 .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
164 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
166 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
169 public static void beforeClass() {
170 TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
174 public static void afterClass() {
175 TEST_SCHEMA_CONTEXT = null;
179 public void setUp() {
180 ready = SettableFuture.create();
182 InMemoryJournal.clear();
183 InMemorySnapshotStore.clear();
185 if (mockShardActor == null) {
186 mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
187 mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString());
190 MessageCollectorActor.clearMessages(mockShardActor);
194 public void tearDown() {
195 InMemoryJournal.clear();
196 InMemorySnapshotStore.clear();
198 mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
199 await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated);
200 mockShardActor = null;
202 actorFactory.close();
205 private TestShardManager.Builder newTestShardMgrBuilder() {
206 return TestShardManager.builder(datastoreContextBuilder)
207 .distributedDataStore(mock(ClientBackedDataStore.class));
210 private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
211 return newTestShardMgrBuilder().configuration(config);
214 private Props newShardMgrProps() {
215 return newShardMgrProps(new MockConfiguration());
218 private Props newShardMgrProps(final Configuration config) {
219 return newTestShardMgrBuilder(config).readinessFuture(ready).props();
222 private ActorSystem newActorSystem(final String config) {
223 return newActorSystem("cluster-test", config);
226 private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
227 String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
228 if (system == getSystem()) {
229 return actorFactory.createActor(MessageCollectorActor.props(), name);
232 return system.actorOf(MessageCollectorActor.props(), name);
235 private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
236 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
237 doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
238 doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
242 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
243 return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
246 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
247 return TestShardManager.builder(datastoreContextBuilder)
248 .shardActor(shardActor)
249 .distributedDataStore(mock(ClientBackedDataStore.class));
253 private Props newPropsShardMgrWithMockShardActor() {
254 return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
255 Dispatchers.DefaultDispatcherId());
258 private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
259 return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
260 .withDispatcher(Dispatchers.DefaultDispatcherId());
264 private TestShardManager newTestShardManager() {
265 return newTestShardManager(newShardMgrProps());
268 private TestShardManager newTestShardManager(final Props props) {
269 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
270 TestShardManager shardManager = shardManagerActor.underlyingActor();
271 shardManager.waitForRecoveryComplete();
275 private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
277 AssertionError last = null;
278 Stopwatch sw = Stopwatch.createStarted();
279 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
281 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
282 kit.expectMsgClass(LocalShardFound.class);
284 } catch (AssertionError e) {
288 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
294 @SuppressWarnings("unchecked")
295 private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
296 Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
297 if (reply instanceof Failure) {
298 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
305 public void testPerShardDatastoreContext() throws Exception {
306 LOG.info("testPerShardDatastoreContext starting");
307 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
308 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
311 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
312 .when(mockFactory).getShardDatastoreContext("default");
315 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
316 .when(mockFactory).getShardDatastoreContext("topology");
318 final MockConfiguration mockConfig = new MockConfiguration() {
320 public Collection<String> getMemberShardNames(final MemberName memberName) {
321 return Arrays.asList("default", "topology");
325 public Collection<MemberName> getMembersFromShardName(final String shardName) {
326 return members("member-1");
330 final ActorRef defaultShardActor = actorFactory.createActor(
331 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
332 final ActorRef topologyShardActor = actorFactory.createActor(
333 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
335 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
336 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
337 shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
338 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
340 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
341 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
342 class LocalShardManager extends ShardManager {
343 LocalShardManager(final AbstractShardManagerCreator<?> creator) {
348 protected ActorRef newShardActor(final ShardInformation info) {
349 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
352 ref = entry.getKey();
353 entry.setValue(info.getDatastoreContext());
356 newShardActorLatch.countDown();
361 final Creator<ShardManager> creator = new Creator<>() {
362 private static final long serialVersionUID = 1L;
364 public ShardManager create() {
365 return new LocalShardManager(
366 new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
367 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
371 final TestKit kit = new TestKit(getSystem());
373 final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
374 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
376 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
378 assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
379 assertEquals("getShardElectionTimeoutFactor", 6,
380 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
381 assertEquals("getShardElectionTimeoutFactor", 7,
382 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
384 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
385 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
387 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
388 .when(newMockFactory).getShardDatastoreContext("default");
391 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
392 .when(newMockFactory).getShardDatastoreContext("topology");
394 shardManager.tell(newMockFactory, kit.getRef());
396 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
397 DatastoreContext.class);
398 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
400 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
401 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
403 LOG.info("testPerShardDatastoreContext ending");
407 public void testOnReceiveFindPrimaryForNonExistentShard() {
408 final TestKit kit = new TestKit(getSystem());
409 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
411 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
413 shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
415 kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
419 public void testOnReceiveFindPrimaryForLocalLeaderShard() {
420 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
421 final TestKit kit = new TestKit(getSystem());
422 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
424 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
426 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
427 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
429 DataTree mockDataTree = mock(DataTree.class);
430 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
431 DataStoreVersions.CURRENT_VERSION), kit.getRef());
433 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
435 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
438 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
440 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
441 LocalPrimaryShardFound.class);
442 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
443 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
444 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
446 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
450 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
451 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
452 final TestKit kit = new TestKit(getSystem());
453 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
455 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
456 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
458 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
459 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
461 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
463 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
466 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
468 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
470 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
474 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
475 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
476 final TestKit kit = new TestKit(getSystem());
477 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
479 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
480 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
482 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
483 MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
485 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
487 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
489 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
490 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
492 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
494 RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
495 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
496 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
497 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
499 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
503 public void testOnReceiveFindPrimaryForUninitializedShard() {
504 final TestKit kit = new TestKit(getSystem());
505 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
507 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
509 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
513 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
514 final TestKit kit = new TestKit(getSystem());
515 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
517 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
518 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
520 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
522 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
526 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
527 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
528 final TestKit kit = new TestKit(getSystem());
529 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
531 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
532 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
534 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
536 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
539 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
541 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
543 DataTree mockDataTree = mock(DataTree.class);
544 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
545 DataStoreVersions.CURRENT_VERSION), mockShardActor);
547 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
549 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
550 LocalPrimaryShardFound.class);
551 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
552 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
553 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
555 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
559 public void testOnReceiveFindPrimaryWaitForShardLeader() {
560 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
561 datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
562 final TestKit kit = new TestKit(getSystem());
563 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
565 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
567 // We're passing waitUntilInitialized = true to FindPrimary so
568 // the response should be
569 // delayed until we send ActorInitialized and
570 // RoleChangeNotification.
571 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
573 kit.expectNoMessage(Duration.ofMillis(150));
575 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
577 kit.expectNoMessage(Duration.ofMillis(150));
579 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
581 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
584 kit.expectNoMessage(Duration.ofMillis(150));
586 DataTree mockDataTree = mock(DataTree.class);
587 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
588 DataStoreVersions.CURRENT_VERSION), mockShardActor);
590 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
591 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
592 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
593 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
595 kit.expectNoMessage(Duration.ofMillis(200));
597 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
601 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
602 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
603 final TestKit kit = new TestKit(getSystem());
604 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
606 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
608 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
610 kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
612 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
614 kit.expectNoMessage(Duration.ofMillis(200));
616 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
620 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
621 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
622 final TestKit kit = new TestKit(getSystem());
623 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
625 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
626 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
627 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
628 RaftState.Candidate.name()), mockShardActor);
630 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
632 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
634 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
638 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
639 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
640 final TestKit kit = new TestKit(getSystem());
641 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
643 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
644 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
645 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
646 RaftState.IsolatedLeader.name()), mockShardActor);
648 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
650 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
652 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
656 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
657 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
658 final TestKit kit = new TestKit(getSystem());
659 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
661 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
662 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
664 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
666 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
668 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
672 public void testOnReceiveFindPrimaryForRemoteShard() {
673 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
674 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
676 // Create an ActorSystem ShardManager actor for member-1.
678 final ActorSystem system1 = newActorSystem("Member1");
679 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
681 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
682 newTestShardMgrBuilderWithMockShardActor().cluster(
683 new ClusterWrapperImpl(system1)).props().withDispatcher(
684 Dispatchers.DefaultDispatcherId()), shardManagerID);
686 // Create an ActorSystem ShardManager actor for member-2.
688 final ActorSystem system2 = newActorSystem("Member2");
690 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
692 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
694 MockConfiguration mockConfig2 = new MockConfiguration(
695 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
696 .put("astronauts", Arrays.asList("member-2")).build());
698 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
699 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
700 new ClusterWrapperImpl(system2)).props().withDispatcher(
701 Dispatchers.DefaultDispatcherId()), shardManagerID);
703 final TestKit kit = new TestKit(system1);
704 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
705 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
707 shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
709 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
710 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
711 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
713 shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
716 shardManager1.underlyingActor().waitForMemberUp();
717 shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
719 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
720 String path = found.getPrimaryPath();
721 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
722 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
724 shardManager2.underlyingActor().verifyFindPrimary();
726 // This part times out quite a bit on jenkins for some reason
728 // Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
730 // shardManager1.underlyingActor().waitForMemberRemoved();
732 // shardManager1.tell(new FindPrimary("astronauts", false), getRef());
734 // expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
736 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
740 public void testShardAvailabilityOnChangeOfMemberReachability() {
741 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
742 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
744 // Create an ActorSystem ShardManager actor for member-1.
746 final ActorSystem system1 = newActorSystem("Member1");
747 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
749 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
751 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
752 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
753 new ClusterWrapperImpl(system1)).props().withDispatcher(
754 Dispatchers.DefaultDispatcherId()), shardManagerID);
756 // Create an ActorSystem ShardManager actor for member-2.
758 final ActorSystem system2 = newActorSystem("Member2");
760 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
762 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
764 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
765 .put("default", Arrays.asList("member-1", "member-2")).build());
767 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
768 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
769 new ClusterWrapperImpl(system2)).props().withDispatcher(
770 Dispatchers.DefaultDispatcherId()), shardManagerID);
772 final TestKit kit = new TestKit(system1);
773 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
774 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
775 shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
776 shardManager2.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
778 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
779 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
780 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
781 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
783 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
785 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
786 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
788 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
790 shardManager1.underlyingActor().waitForMemberUp();
792 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
794 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
795 String path = found.getPrimaryPath();
796 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
798 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
801 shardManager1.underlyingActor().waitForUnreachableMember();
802 MessageCollectorActor.clearMessages(mockShardActor1);
804 shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
807 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
809 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
811 shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
814 shardManager1.underlyingActor().waitForReachableMember();
816 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
818 RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
819 String path1 = found1.getPrimaryPath();
820 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
822 shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
825 // Test FindPrimary wait succeeds after reachable member event.
827 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
828 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
829 shardManager1.underlyingActor().waitForUnreachableMember();
831 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
834 MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
836 RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
837 String path2 = found2.getPrimaryPath();
838 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
840 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
844 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
845 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
846 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
848 // Create an ActorSystem ShardManager actor for member-1.
850 final ActorSystem system1 = newActorSystem("Member1");
851 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
853 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
855 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
856 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
857 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
858 .primaryShardInfoCache(primaryShardInfoCache).props()
859 .withDispatcher(Dispatchers.DefaultDispatcherId()),
862 // Create an ActorSystem ShardManager actor for member-2.
864 final ActorSystem system2 = newActorSystem("Member2");
866 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
868 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
870 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
871 .put("default", Arrays.asList("member-1", "member-2")).build());
873 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
874 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
875 new ClusterWrapperImpl(system2)).props().withDispatcher(
876 Dispatchers.DefaultDispatcherId()), shardManagerID);
878 final TestKit kit = new TestKit(system1);
879 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
880 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
881 shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
882 shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
884 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
885 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
886 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
887 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
889 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
891 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
892 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
894 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
896 shardManager1.underlyingActor().waitForMemberUp();
898 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
900 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
901 String path = found.getPrimaryPath();
902 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
904 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
905 system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
907 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
908 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
910 shardManager1.underlyingActor().waitForUnreachableMember();
912 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
914 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
916 assertNull("Expected primaryShardInfoCache entry removed",
917 primaryShardInfoCache.getIfPresent("default"));
919 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
920 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
922 new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
925 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
927 LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
928 String path1 = found1.getPrimaryPath();
929 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
931 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
935 public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
936 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
937 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
939 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
940 .put("default", Arrays.asList("member-256", "member-2")).build());
942 // Create an ActorSystem, ShardManager and actor for member-256.
944 final ActorSystem system256 = newActorSystem("Member256");
945 // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
946 Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
948 final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
950 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
952 // ShardManager must be created with shard configuration to let its localShards has shards.
953 final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
954 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
955 .cluster(new ClusterWrapperImpl(system256))
956 .primaryShardInfoCache(primaryShardInfoCache).props()
957 .withDispatcher(Dispatchers.DefaultDispatcherId()),
960 // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
962 final ActorSystem system2 = newActorSystem("Member2");
964 // Join member-2 into the cluster of member-256.
965 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
967 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
969 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
970 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
971 new ClusterWrapperImpl(system2)).props().withDispatcher(
972 Dispatchers.DefaultDispatcherId()), shardManagerID);
974 final TestKit kit256 = new TestKit(system256);
975 shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
976 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
977 shardManager256.tell(new ActorInitialized(mockShardActor256), ActorRef.noSender());
978 shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
980 String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
981 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
982 shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
983 DataStoreVersions.CURRENT_VERSION), mockShardActor256);
984 shardManager256.tell(
985 new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
987 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
988 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
990 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
992 shardManager256.underlyingActor().waitForMemberUp();
994 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
996 LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
997 String path = found.getPrimaryPath();
998 assertTrue("Unexpected primary path " + path + " which must on member-256",
999 path.contains("member-256-shard-default-config"));
1001 PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
1002 system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
1003 primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
1005 // Simulate member-2 become unreachable.
1006 shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
1007 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
1008 shardManager256.underlyingActor().waitForUnreachableMember();
1010 // Make sure leader shard on member-256 is still leader and still in the cache.
1011 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
1012 found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
1013 path = found.getPrimaryPath();
1014 assertTrue("Unexpected primary path " + path + " which must still not on member-256",
1015 path.contains("member-256-shard-default-config"));
1016 Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
1017 futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
1019 public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
1020 if (failure != null) {
1021 assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
1023 assertEquals("Expected primaryShardInfoCache entry",
1024 primaryShardInfo, futurePrimaryShardInfo);
1027 }, system256.dispatchers().defaultGlobalDispatcher());
1029 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
1033 public void testOnReceiveFindLocalShardForNonExistentShard() {
1034 final TestKit kit = new TestKit(getSystem());
1035 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1037 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1039 shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
1041 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1043 assertEquals("getShardName", "non-existent", notFound.getShardName());
1047 public void testOnReceiveFindLocalShardForExistentShard() {
1048 final TestKit kit = new TestKit(getSystem());
1049 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1051 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1052 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1054 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1056 LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1058 assertTrue("Found path contains " + found.getPath().path().toString(),
1059 found.getPath().path().toString().contains("member-1-shard-default-config"));
1063 public void testOnReceiveFindLocalShardForNotInitializedShard() {
1064 final TestKit kit = new TestKit(getSystem());
1065 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1067 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1069 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1073 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1074 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1075 final TestKit kit = new TestKit(getSystem());
1076 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1078 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1080 // We're passing waitUntilInitialized = true to FindLocalShard
1081 // so the response should be
1082 // delayed until we send ActorInitialized.
1083 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1084 new Timeout(5, TimeUnit.SECONDS));
1086 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1088 Object resp = Await.result(future, kit.duration("5 seconds"));
1089 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1091 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1095 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1096 TestShardManager shardManager = newTestShardManager();
1098 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1099 shardManager.handleCommand(new RoleChangeNotification(
1100 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1101 assertFalse(ready.isDone());
1103 shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1104 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1105 assertTrue(ready.isDone());
1109 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1110 final TestKit kit = new TestKit(getSystem());
1111 TestShardManager shardManager = newTestShardManager();
1113 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1114 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1115 assertFalse(ready.isDone());
1117 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1119 shardManager.handleCommand(
1120 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1121 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1122 assertTrue(ready.isDone());
1126 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1127 final TestKit kit = new TestKit(getSystem());
1128 TestShardManager shardManager = newTestShardManager();
1130 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1131 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1132 assertFalse(ready.isDone());
1134 shardManager.handleCommand(
1135 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1136 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1138 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1139 assertTrue(ready.isDone());
1143 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1144 TestShardManager shardManager = newTestShardManager();
1146 shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1147 RaftState.Leader.name()));
1148 assertFalse(ready.isDone());
1152 public void testByDefaultSyncStatusIsFalse() {
1153 TestShardManager shardManager = newTestShardManager();
1155 assertFalse(shardManager.getMBean().getSyncStatus());
1159 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1160 TestShardManager shardManager = newTestShardManager();
1162 shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1163 RaftState.Follower.name(), RaftState.Leader.name()));
1165 assertTrue(shardManager.getMBean().getSyncStatus());
1169 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1170 TestShardManager shardManager = newTestShardManager();
1172 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1173 shardManager.handleCommand(new RoleChangeNotification(shardId,
1174 RaftState.Follower.name(), RaftState.Candidate.name()));
1176 assertFalse(shardManager.getMBean().getSyncStatus());
1178 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1179 shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1182 assertFalse(shardManager.getMBean().getSyncStatus());
1186 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1187 TestShardManager shardManager = newTestShardManager();
1189 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1190 shardManager.handleCommand(new RoleChangeNotification(shardId,
1191 RaftState.Candidate.name(), RaftState.Follower.name()));
1193 // Initially will be false
1194 assertFalse(shardManager.getMBean().getSyncStatus());
1196 // Send status true will make sync status true
1197 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1199 assertTrue(shardManager.getMBean().getSyncStatus());
1201 // Send status false will make sync status false
1202 shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1204 assertFalse(shardManager.getMBean().getSyncStatus());
1208 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1209 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1210 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1212 public List<String> getMemberShardNames(final MemberName memberName) {
1213 return Arrays.asList("default", "astronauts");
1217 // Initially will be false
1218 assertFalse(shardManager.getMBean().getSyncStatus());
1220 // Make default shard leader
1221 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1222 shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1223 RaftState.Follower.name(), RaftState.Leader.name()));
1225 // default = Leader, astronauts is unknown so sync status remains false
1226 assertFalse(shardManager.getMBean().getSyncStatus());
1228 // Make astronauts shard leader as well
1229 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1230 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1231 RaftState.Follower.name(), RaftState.Leader.name()));
1233 // Now sync status should be true
1234 assertTrue(shardManager.getMBean().getSyncStatus());
1236 // Make astronauts a Follower
1237 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1238 RaftState.Leader.name(), RaftState.Follower.name()));
1240 // Sync status is not true
1241 assertFalse(shardManager.getMBean().getSyncStatus());
1243 // Make the astronauts follower sync status true
1244 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1246 // Sync status is now true
1247 assertTrue(shardManager.getMBean().getSyncStatus());
1249 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1253 public void testOnReceiveSwitchShardBehavior() {
1254 final TestKit kit = new TestKit(getSystem());
1255 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1257 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1258 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1260 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1262 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1263 SwitchBehavior.class);
1265 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1266 assertEquals(1000, switchBehavior.getNewTerm());
1269 private static List<MemberName> members(final String... names) {
1270 return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1274 public void testOnCreateShard() {
1275 LOG.info("testOnCreateShard starting");
1276 final TestKit kit = new TestKit(getSystem());
1277 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1279 ActorRef shardManager = actorFactory
1280 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1281 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1283 EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1284 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1286 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1287 .persistent(false).build();
1288 Shard.Builder shardBuilder = Shard.builder();
1290 ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1291 "foo", null, members("member-1", "member-5", "member-6"));
1292 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1294 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1296 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1298 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1300 assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1301 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1302 .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1303 assertEquals("peerMembers", Sets.newHashSet(
1304 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1305 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1306 shardBuilder.getPeerAddresses().keySet());
1307 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1308 shardBuilder.getId());
1309 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1311 // Send CreateShard with same name - should return Success with
1314 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1316 Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1317 assertNotNull("Success status is null", success.status());
1319 LOG.info("testOnCreateShard ending");
1323 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1324 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1325 final TestKit kit = new TestKit(getSystem());
1326 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1328 ActorRef shardManager = actorFactory
1329 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1330 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1332 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1334 Shard.Builder shardBuilder = Shard.builder();
1335 ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1336 "foo", null, members("member-5", "member-6"));
1338 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1339 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1341 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1342 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1344 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1345 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1346 .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1348 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1352 public void testOnCreateShardWithNoInitialSchemaContext() {
1353 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1354 final TestKit kit = new TestKit(getSystem());
1355 ActorRef shardManager = actorFactory
1356 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1357 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1359 Shard.Builder shardBuilder = Shard.builder();
1361 ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
1362 "foo", null, members("member-1"));
1363 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1365 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1367 EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
1368 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1370 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1372 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1374 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1375 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1377 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1381 public void testGetSnapshot() {
1382 LOG.info("testGetSnapshot starting");
1383 TestKit kit = new TestKit(getSystem());
1385 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1386 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1387 .put("astronauts", Collections.<String>emptyList()).build());
1389 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1390 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1392 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1393 Failure failure = kit.expectMsgClass(Failure.class);
1394 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1396 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1398 waitForShardInitialized(shardManager, "shard1", kit);
1399 waitForShardInitialized(shardManager, "shard2", kit);
1401 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1403 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1405 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1406 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1408 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1409 datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1411 // Add a new replica
1413 TestKit mockShardLeaderKit = new TestKit(getSystem());
1415 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1416 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1418 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1419 mockShardLeaderKit.expectMsgClass(AddServer.class);
1420 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1421 kit.expectMsgClass(Status.Success.class);
1422 waitForShardInitialized(shardManager, "astronauts", kit);
1424 // Send another GetSnapshot and verify
1426 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1427 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1429 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1430 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1432 ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1433 assertNotNull("Expected ShardManagerSnapshot", snapshot);
1434 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1435 Sets.newHashSet(snapshot.getShardList()));
1437 LOG.info("testGetSnapshot ending");
1441 public void testRestoreFromSnapshot() {
1442 LOG.info("testRestoreFromSnapshot starting");
1444 datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1446 TestKit kit = new TestKit(getSystem());
1448 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1449 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1450 .put("astronauts", Collections.<String>emptyList()).build());
1452 ShardManagerSnapshot snapshot =
1453 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1454 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1455 Collections.<ShardSnapshot>emptyList());
1456 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1457 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1459 shardManager.underlyingActor().waitForRecoveryComplete();
1461 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1463 waitForShardInitialized(shardManager, "shard1", kit);
1464 waitForShardInitialized(shardManager, "shard2", kit);
1465 waitForShardInitialized(shardManager, "astronauts", kit);
1467 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1469 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1471 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1473 assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1474 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1475 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1477 LOG.info("testRestoreFromSnapshot ending");
1481 public void testAddShardReplicaForNonExistentShardConfig() {
1482 final TestKit kit = new TestKit(getSystem());
1483 ActorRef shardManager = actorFactory
1484 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1485 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1487 shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1488 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1490 assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1494 public void testAddShardReplica() {
1495 LOG.info("testAddShardReplica starting");
1496 MockConfiguration mockConfig = new MockConfiguration(
1497 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1498 .put("astronauts", Arrays.asList("member-2")).build());
1500 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1501 datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1503 // Create an ActorSystem ShardManager actor for member-1.
1504 final ActorSystem system1 = newActorSystem("Member1");
1505 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1506 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1507 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1508 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1509 .cluster(new ClusterWrapperImpl(system1)).props()
1510 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1513 // Create an ActorSystem ShardManager actor for member-2.
1514 final ActorSystem system2 = newActorSystem("Member2");
1515 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1517 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1518 String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1519 final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1520 Props.create(MockRespondActor.class, AddServer.class,
1521 new AddServerReply(ServerChangeStatus.OK, memberId2))
1522 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1524 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1525 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1526 .cluster(new ClusterWrapperImpl(system2)).props()
1527 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1530 final TestKit kit = new TestKit(getSystem());
1531 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1532 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1534 leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
1536 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1537 leaderShardManager.tell(
1538 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1539 mockShardLeaderActor);
1540 leaderShardManager.tell(
1541 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1542 mockShardLeaderActor);
1544 newReplicaShardManager.underlyingActor().waitForMemberUp();
1545 leaderShardManager.underlyingActor().waitForMemberUp();
1547 // Have a dummy snapshot to be overwritten by the new data
1549 String[] restoredShards = { "default", "people" };
1550 ShardManagerSnapshot snapshot =
1551 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1552 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1553 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1555 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1556 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1558 // construct a mock response message
1559 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1560 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1562 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1563 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1564 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1566 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1567 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1568 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1569 ShardManagerSnapshot.class);
1570 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1571 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1572 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1573 Sets.newHashSet(shardManagerSnapshot.getShardList()));
1574 LOG.info("testAddShardReplica ending");
1578 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1579 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1580 final TestKit kit = new TestKit(getSystem());
1581 TestActorRef<TestShardManager> shardManager = actorFactory
1582 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1584 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1585 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1587 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1588 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1589 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1590 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1592 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1594 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1596 new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1599 new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1602 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1604 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1606 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1607 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1609 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1610 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1612 // Send message again to verify previous in progress state is
1615 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1616 resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1617 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1619 // Send message again with an AddServer timeout to verify the
1620 // pre-existing shard actor isn't terminated.
1623 newDatastoreContextFactory(
1624 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1625 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1626 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1627 kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1629 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1630 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1632 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1636 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1637 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1638 final TestKit kit = new TestKit(getSystem());
1639 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1640 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1642 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1643 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
1644 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1645 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1647 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1650 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1651 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1652 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1654 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1655 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1657 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1661 public void testAddShardReplicaWithAddServerReplyFailure() {
1662 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1663 final TestKit kit = new TestKit(getSystem());
1664 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1666 MockConfiguration mockConfig = new MockConfiguration(
1667 ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1669 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1670 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1671 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1672 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1673 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1675 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1677 TestKit terminateWatcher = new TestKit(getSystem());
1678 terminateWatcher.watch(mockNewReplicaShardActor);
1680 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1682 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1683 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1684 addServerMsg.getNewServerId());
1685 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1687 Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1688 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1690 shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1691 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1693 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1695 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1696 mockShardLeaderKit.expectMsgClass(AddServer.class);
1697 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1698 failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1699 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1701 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1705 public void testAddShardReplicaWithAlreadyInProgress() {
1706 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1707 AddServer.class, new AddShardReplica("astronauts"));
1711 public void testAddShardReplicaWithFindPrimaryTimeout() {
1712 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1713 datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1714 final TestKit kit = new TestKit(getSystem());
1715 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1717 final ActorRef newReplicaShardManager = actorFactory
1718 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1719 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1721 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1722 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1723 AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1725 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1726 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1727 assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1729 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1733 public void testRemoveShardReplicaForNonExistentShard() {
1734 final TestKit kit = new TestKit(getSystem());
1735 ActorRef shardManager = actorFactory
1736 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1737 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1739 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1740 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1741 assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1748 public void testRemoveShardReplicaLocal() {
1749 final TestKit kit = new TestKit(getSystem());
1750 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1752 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1753 RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1755 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1757 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1758 shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
1759 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1760 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1762 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1765 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1766 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1767 RemoveServer.class);
1768 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1769 removeServer.getServerId());
1770 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1774 public void testRemoveShardReplicaRemote() {
1775 MockConfiguration mockConfig = new MockConfiguration(
1776 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1777 .put("astronauts", Arrays.asList("member-1")).build());
1779 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1781 // Create an ActorSystem ShardManager actor for member-1.
1782 final ActorSystem system1 = newActorSystem("Member1");
1783 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1784 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1786 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1787 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1788 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1791 // Create an ActorSystem ShardManager actor for member-2.
1792 final ActorSystem system2 = newActorSystem("Member2");
1793 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1795 String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1796 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1797 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1798 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1799 new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1801 LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1803 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1804 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1805 new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1808 // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1809 // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1810 // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1811 // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1813 // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1814 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1815 // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1817 // To work around this problem we create a ForwardingActor with the right address and pass to it the
1818 // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1819 // thing works as expected
1820 final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1821 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1822 "member-2-shard-default-" + shardMrgIDSuffix);
1824 LOG.error("Forwarding actor : {}", actorRef);
1826 final TestKit kit = new TestKit(getSystem());
1827 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1828 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1830 leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
1831 newReplicaShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
1833 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1834 leaderShardManager.tell(
1835 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1836 mockShardLeaderActor);
1837 leaderShardManager.tell(
1838 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1839 mockShardLeaderActor);
1841 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1842 newReplicaShardManager.tell(
1843 new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1845 newReplicaShardManager.tell(
1846 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1849 newReplicaShardManager.underlyingActor().waitForMemberUp();
1850 leaderShardManager.underlyingActor().waitForMemberUp();
1852 // construct a mock response message
1853 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1854 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1855 RemoveServer.class);
1856 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1857 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1858 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1862 public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1863 testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1864 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1868 public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1869 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1870 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1874 public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1875 final Class<?> firstForwardedServerChangeClass,
1876 final Object secondServerChange) {
1877 final TestKit kit = new TestKit(getSystem());
1878 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1879 final TestKit secondRequestKit = new TestKit(getSystem());
1881 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1882 .put(shardName, Arrays.asList("member-2")).build());
1884 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1885 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1886 .cluster(new MockClusterWrapper()).props()
1887 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1890 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1892 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1894 shardManager.tell(firstServerChange, kit.getRef());
1896 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1898 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1900 secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1904 public void testServerRemovedShardActorNotRunning() {
1905 LOG.info("testServerRemovedShardActorNotRunning starting");
1906 final TestKit kit = new TestKit(getSystem());
1907 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1908 .put("default", Arrays.asList("member-1", "member-2"))
1909 .put("astronauts", Arrays.asList("member-2"))
1910 .put("people", Arrays.asList("member-1", "member-2")).build());
1912 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1913 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1915 shardManager.underlyingActor().waitForRecoveryComplete();
1916 shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1917 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1919 shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1920 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1922 // Removed the default shard replica from member-1
1923 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1924 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1926 shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1928 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1930 LOG.info("testServerRemovedShardActorNotRunning ending");
1934 public void testServerRemovedShardActorRunning() {
1935 LOG.info("testServerRemovedShardActorRunning starting");
1936 final TestKit kit = new TestKit(getSystem());
1937 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1938 .put("default", Arrays.asList("member-1", "member-2"))
1939 .put("astronauts", Arrays.asList("member-2"))
1940 .put("people", Arrays.asList("member-1", "member-2")).build());
1942 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1943 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1945 TestActorRef<TestShardManager> shardManager = actorFactory
1946 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1947 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1949 shardManager.underlyingActor().waitForRecoveryComplete();
1951 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1952 shardManager.tell(new ActorInitialized(shard), ActorRef.noSender());
1954 waitForShardInitialized(shardManager, "people", kit);
1955 waitForShardInitialized(shardManager, "default", kit);
1957 // Removed the default shard replica from member-1
1958 shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1960 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1962 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1964 LOG.info("testServerRemovedShardActorRunning ending");
1968 public void testShardPersistenceWithRestoredData() {
1969 LOG.info("testShardPersistenceWithRestoredData starting");
1970 final TestKit kit = new TestKit(getSystem());
1971 MockConfiguration mockConfig =
1972 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1973 .put("default", Arrays.asList("member-1", "member-2"))
1974 .put("astronauts", Arrays.asList("member-2"))
1975 .put("people", Arrays.asList("member-1", "member-2")).build());
1976 String[] restoredShards = {"default", "astronauts"};
1977 ShardManagerSnapshot snapshot =
1978 new ShardManagerSnapshot(Arrays.asList(restoredShards));
1979 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1981 // create shardManager to come up with restored data
1982 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1983 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1985 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1987 newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1988 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1989 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1991 // Verify a local shard is created for the restored shards,
1992 // although we expect a NotInitializedException for the shards
1993 // as the actor initialization
1994 // message is not sent for them
1995 newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1996 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1998 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1999 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
2001 LOG.info("testShardPersistenceWithRestoredData ending");
2005 public void testShutDown() throws Exception {
2006 LOG.info("testShutDown starting");
2007 final TestKit kit = new TestKit(getSystem());
2008 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
2009 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
2011 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
2012 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
2014 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
2015 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
2017 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
2018 .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
2020 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2021 shardManager.tell(new ActorInitialized(shard1), ActorRef.noSender());
2022 shardManager.tell(new ActorInitialized(shard2), ActorRef.noSender());
2024 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
2025 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
2027 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
2028 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2031 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2032 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2033 } catch (TimeoutException e) {
2037 actorFactory.killActor(shard1, kit);
2038 actorFactory.killActor(shard2, kit);
2040 Boolean stopped = Await.result(stopFuture, duration);
2041 assertEquals("Stopped", Boolean.TRUE, stopped);
2043 LOG.info("testShutDown ending");
2047 public void testChangeServersVotingStatus() {
2048 final TestKit kit = new TestKit(getSystem());
2049 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2051 ActorRef respondActor = actorFactory
2052 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2053 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2055 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2057 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2058 shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
2059 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2060 DataStoreVersions.CURRENT_VERSION), kit.getRef());
2062 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2066 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2068 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2069 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2070 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2071 ImmutableMap.of(ShardIdentifier
2072 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2075 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2079 public void testChangeServersVotingStatusWithNoLeader() {
2080 final TestKit kit = new TestKit(getSystem());
2081 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2083 ActorRef respondActor = actorFactory
2084 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2085 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2087 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2089 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2090 shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
2091 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2094 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2096 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2098 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2099 assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2102 @SuppressWarnings("unchecked")
2104 public void testRegisterForShardLeaderChanges() {
2105 LOG.info("testRegisterForShardLeaderChanges starting");
2107 final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2108 final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2109 final TestKit kit = new TestKit(getSystem());
2110 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2112 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2113 shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
2115 final Consumer<String> mockCallback = mock(Consumer.class);
2116 shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2118 final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2119 final Registration reg = (Registration) reply.status();
2121 final DataTree mockDataTree = mock(DataTree.class);
2122 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2123 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2125 verify(mockCallback, timeout(5000)).accept("default");
2127 reset(mockCallback);
2128 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2129 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2131 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2132 verifyNoMoreInteractions(mockCallback);
2134 shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2135 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2137 verify(mockCallback, timeout(5000)).accept("default");
2139 reset(mockCallback);
2140 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2141 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2143 verify(mockCallback, timeout(5000)).accept("default");
2145 reset(mockCallback);
2148 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2149 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2151 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2152 verifyNoMoreInteractions(mockCallback);
2154 LOG.info("testRegisterForShardLeaderChanges ending");
2157 public static class TestShardManager extends ShardManager {
2158 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2159 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2160 private ShardManagerSnapshot snapshot;
2161 private final Map<String, ActorRef> shardActors;
2162 private final ActorRef shardActor;
2163 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2164 private CountDownLatch memberUpReceived = new CountDownLatch(1);
2165 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2166 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2167 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2168 private volatile MessageInterceptor messageInterceptor;
2170 TestShardManager(final Builder builder) {
2172 shardActor = builder.shardActor;
2173 shardActors = builder.shardActors;
2177 protected void handleRecover(final Object message) throws Exception {
2179 super.handleRecover(message);
2181 if (message instanceof RecoveryCompleted) {
2182 recoveryComplete.countDown();
2187 private void countDownIfOther(final Member member, final CountDownLatch latch) {
2188 if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2194 public void handleCommand(final Object message) throws Exception {
2196 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2197 getSender().tell(messageInterceptor.apply(message), getSelf());
2199 super.handleCommand(message);
2202 if (message instanceof FindPrimary) {
2203 findPrimaryMessageReceived.countDown();
2204 } else if (message instanceof ClusterEvent.MemberUp) {
2205 countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2206 } else if (message instanceof ClusterEvent.MemberRemoved) {
2207 countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2208 } else if (message instanceof ClusterEvent.UnreachableMember) {
2209 countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2210 } else if (message instanceof ClusterEvent.ReachableMember) {
2211 countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2216 void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2217 this.messageInterceptor = messageInterceptor;
2220 void waitForRecoveryComplete() {
2221 assertTrue("Recovery complete",
2222 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2225 public void waitForMemberUp() {
2226 assertTrue("MemberUp received",
2227 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2228 memberUpReceived = new CountDownLatch(1);
2231 void waitForMemberRemoved() {
2232 assertTrue("MemberRemoved received",
2233 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2234 memberRemovedReceived = new CountDownLatch(1);
2237 void waitForUnreachableMember() {
2238 assertTrue("UnreachableMember received",
2239 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2240 memberUnreachableReceived = new CountDownLatch(1);
2243 void waitForReachableMember() {
2244 assertTrue("ReachableMember received",
2245 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2246 memberReachableReceived = new CountDownLatch(1);
2249 void verifyFindPrimary() {
2250 assertTrue("FindPrimary received",
2251 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2252 findPrimaryMessageReceived = new CountDownLatch(1);
2255 public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2256 return new Builder(datastoreContextBuilder);
2259 public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2260 private ActorRef shardActor;
2261 private final Map<String, ActorRef> shardActors = new HashMap<>();
2263 Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2264 super(TestShardManager.class);
2265 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2268 Builder shardActor(final ActorRef newShardActor) {
2269 shardActor = newShardActor;
2273 Builder addShardActor(final String shardName, final ActorRef actorRef) {
2274 shardActors.put(shardName, actorRef);
2280 public void saveSnapshot(final Object obj) {
2281 snapshot = (ShardManagerSnapshot) obj;
2282 snapshotPersist.countDown();
2283 super.saveSnapshot(obj);
2286 void verifySnapshotPersisted(final Set<String> shardList) {
2287 assertTrue("saveSnapshot invoked",
2288 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2289 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2293 protected ActorRef newShardActor(final ShardInformation info) {
2294 if (shardActors.get(info.getShardName()) != null) {
2295 return shardActors.get(info.getShardName());
2298 if (shardActor != null) {
2302 return super.newShardActor(info);
2306 private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2307 extends AbstractShardManagerCreator<T> {
2308 private final Class<C> shardManagerClass;
2310 AbstractGenericCreator(final Class<C> shardManagerClass) {
2311 this.shardManagerClass = shardManagerClass;
2312 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
2313 .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2317 public Props props() {
2319 return Props.create(shardManagerClass, this);
2323 private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2324 GenericCreator(final Class<C> shardManagerClass) {
2325 super(shardManagerClass);
2329 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2330 private static final long serialVersionUID = 1L;
2331 private final Creator<ShardManager> delegate;
2333 DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2334 this.delegate = delegate;
2338 public ShardManager create() throws Exception {
2339 return delegate.create();
2343 interface MessageInterceptor extends Function<Object, Object> {
2344 boolean canIntercept(Object message);
2347 private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2348 return new MessageInterceptor() {
2350 public Object apply(final Object message) {
2351 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2355 public boolean canIntercept(final Object message) {
2356 return message instanceof FindPrimary;
2361 private static class MockRespondActor extends MessageCollectorActor {
2362 static final String CLEAR_RESPONSE = "clear-response";
2364 private Object responseMsg;
2365 private final Class<?> requestClass;
2367 @SuppressWarnings("unused")
2368 MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2369 this.requestClass = requestClass;
2370 this.responseMsg = responseMsg;
2374 public void onReceive(final Object message) throws Exception {
2375 if (message.equals(CLEAR_RESPONSE)) {
2378 super.onReceive(message);
2379 if (message.getClass().equals(requestClass) && responseMsg != null) {
2380 getSender().tell(responseMsg, getSelf());