2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.anyString;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.times;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
27 import akka.actor.ActorRef;
28 import akka.actor.ActorSystem;
29 import akka.actor.AddressFromURIString;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Function;
47 import com.google.common.base.Stopwatch;
48 import com.google.common.collect.ImmutableMap;
49 import com.google.common.collect.Lists;
50 import com.google.common.collect.Sets;
51 import com.google.common.util.concurrent.Uninterruptibles;
53 import java.time.Duration;
54 import java.util.AbstractMap;
55 import java.util.Arrays;
56 import java.util.Collection;
57 import java.util.Collections;
58 import java.util.HashMap;
59 import java.util.List;
61 import java.util.Map.Entry;
63 import java.util.concurrent.CountDownLatch;
64 import java.util.concurrent.TimeUnit;
65 import java.util.concurrent.TimeoutException;
66 import java.util.function.Consumer;
67 import java.util.stream.Collectors;
68 import org.junit.AfterClass;
69 import org.junit.BeforeClass;
70 import org.junit.Test;
71 import org.opendaylight.controller.cluster.access.concepts.MemberName;
72 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
73 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
74 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
75 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
76 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
77 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
78 import org.opendaylight.controller.cluster.datastore.Shard;
79 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
80 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
81 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
82 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
83 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
84 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
85 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
86 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
87 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
88 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
89 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
90 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
91 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
92 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
93 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
94 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
95 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
96 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
97 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
98 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
99 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
100 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
102 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
103 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
104 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
105 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
106 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
107 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
108 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
109 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
110 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
111 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
112 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
113 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
114 import org.opendaylight.controller.cluster.raft.RaftState;
115 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
116 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
117 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
118 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
119 import org.opendaylight.controller.cluster.raft.messages.AddServer;
120 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
121 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
122 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
123 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
124 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
125 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
126 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
127 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
128 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
129 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
130 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
131 import org.opendaylight.yangtools.concepts.Registration;
132 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
133 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
134 import org.slf4j.Logger;
135 import org.slf4j.LoggerFactory;
136 import scala.concurrent.Await;
137 import scala.concurrent.Future;
138 import scala.concurrent.duration.FiniteDuration;
140 public class ShardManagerTest extends AbstractShardManagerTest {
141 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
142 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
143 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
145 private static SchemaContext TEST_SCHEMA_CONTEXT;
147 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
150 public static void beforeClass() {
151 TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
155 public static void afterClass() {
156 TEST_SCHEMA_CONTEXT = null;
159 private ActorSystem newActorSystem(final String config) {
160 return newActorSystem("cluster-test", config);
163 private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
164 String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
165 if (system == getSystem()) {
166 return actorFactory.createActor(MessageCollectorActor.props(), name);
169 return system.actorOf(MessageCollectorActor.props(), name);
172 private Props newShardMgrProps() {
173 return newShardMgrProps(new MockConfiguration());
176 private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
177 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
178 doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
179 doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
183 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
184 return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
187 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
188 return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
189 .distributedDataStore(mock(DistributedDataStore.class));
193 private Props newPropsShardMgrWithMockShardActor() {
194 return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
195 Dispatchers.DefaultDispatcherId());
198 private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
199 return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
200 .withDispatcher(Dispatchers.DefaultDispatcherId());
204 private TestShardManager newTestShardManager() {
205 return newTestShardManager(newShardMgrProps());
208 private TestShardManager newTestShardManager(final Props props) {
209 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
210 TestShardManager shardManager = shardManagerActor.underlyingActor();
211 shardManager.waitForRecoveryComplete();
215 private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
217 AssertionError last = null;
218 Stopwatch sw = Stopwatch.createStarted();
219 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
221 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
222 kit.expectMsgClass(LocalShardFound.class);
224 } catch (AssertionError e) {
228 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
234 @SuppressWarnings("unchecked")
235 private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
236 Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
237 if (reply instanceof Failure) {
238 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
245 public void testPerShardDatastoreContext() throws Exception {
246 LOG.info("testPerShardDatastoreContext starting");
247 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
248 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
251 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
252 .when(mockFactory).getShardDatastoreContext("default");
255 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
256 .when(mockFactory).getShardDatastoreContext("topology");
258 final MockConfiguration mockConfig = new MockConfiguration() {
260 public Collection<String> getMemberShardNames(final MemberName memberName) {
261 return Arrays.asList("default", "topology");
265 public Collection<MemberName> getMembersFromShardName(final String shardName) {
266 return members("member-1");
270 final ActorRef defaultShardActor = actorFactory.createActor(
271 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
272 final ActorRef topologyShardActor = actorFactory.createActor(
273 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
275 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
276 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
277 shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
278 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
280 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
281 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
282 class LocalShardManager extends ShardManager {
283 LocalShardManager(final AbstractShardManagerCreator<?> creator) {
288 protected ActorRef newShardActor(final ShardInformation info) {
289 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
292 ref = entry.getKey();
293 entry.setValue(info.getDatastoreContext());
296 newShardActorLatch.countDown();
301 final Creator<ShardManager> creator = new Creator<ShardManager>() {
302 private static final long serialVersionUID = 1L;
304 public ShardManager create() {
305 return new LocalShardManager(
306 new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
307 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
311 final TestKit kit = new TestKit(getSystem());
313 final ActorRef shardManager = actorFactory.createActor(Props.create(
314 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
316 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
318 assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
319 assertEquals("getShardElectionTimeoutFactor", 6,
320 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
321 assertEquals("getShardElectionTimeoutFactor", 7,
322 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
324 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
325 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
327 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
328 .when(newMockFactory).getShardDatastoreContext("default");
331 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
332 .when(newMockFactory).getShardDatastoreContext("topology");
334 shardManager.tell(newMockFactory, kit.getRef());
336 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
337 DatastoreContext.class);
338 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
340 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
341 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
343 LOG.info("testPerShardDatastoreContext ending");
347 public void testOnReceiveFindPrimaryForNonExistentShard() {
348 final TestKit kit = new TestKit(getSystem());
349 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
351 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
353 shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
355 kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
359 public void testOnReceiveFindPrimaryForLocalLeaderShard() {
360 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
361 final TestKit kit = new TestKit(getSystem());
362 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
364 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
366 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
367 shardManager.tell(new ActorInitialized(), mockShardActor);
369 DataTree mockDataTree = mock(DataTree.class);
370 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
371 DataStoreVersions.CURRENT_VERSION), kit.getRef());
373 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
375 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
378 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
380 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
381 LocalPrimaryShardFound.class);
382 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
383 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
384 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
386 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
390 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
391 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
392 final TestKit kit = new TestKit(getSystem());
393 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
395 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
396 shardManager.tell(new ActorInitialized(), mockShardActor);
398 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
399 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
401 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
403 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
406 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
408 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
410 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
414 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
415 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
416 final TestKit kit = new TestKit(getSystem());
417 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
419 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
420 shardManager.tell(new ActorInitialized(), mockShardActor);
422 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
423 MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
425 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
427 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
429 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
430 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
432 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
434 RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
435 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
436 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
437 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
439 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
443 public void testOnReceiveFindPrimaryForUninitializedShard() {
444 final TestKit kit = new TestKit(getSystem());
445 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
447 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
449 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
453 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
454 final TestKit kit = new TestKit(getSystem());
455 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
457 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
458 shardManager.tell(new ActorInitialized(), mockShardActor);
460 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
462 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
466 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
467 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
468 final TestKit kit = new TestKit(getSystem());
469 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
471 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
472 shardManager.tell(new ActorInitialized(), mockShardActor);
474 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
476 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
479 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
481 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
483 DataTree mockDataTree = mock(DataTree.class);
484 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
485 DataStoreVersions.CURRENT_VERSION), mockShardActor);
487 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
489 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
490 LocalPrimaryShardFound.class);
491 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
492 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
493 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
495 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
499 public void testOnReceiveFindPrimaryWaitForShardLeader() {
500 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
501 datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
502 final TestKit kit = new TestKit(getSystem());
503 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
505 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
507 // We're passing waitUntilInitialized = true to FindPrimary so
508 // the response should be
509 // delayed until we send ActorInitialized and
510 // RoleChangeNotification.
511 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
513 kit.expectNoMessage(Duration.ofMillis(150));
515 shardManager.tell(new ActorInitialized(), mockShardActor);
517 kit.expectNoMessage(Duration.ofMillis(150));
519 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
521 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
524 kit.expectNoMessage(Duration.ofMillis(150));
526 DataTree mockDataTree = mock(DataTree.class);
527 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
528 DataStoreVersions.CURRENT_VERSION), mockShardActor);
530 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
531 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
532 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
533 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
535 kit.expectNoMessage(Duration.ofMillis(200));
537 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
541 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
542 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
543 final TestKit kit = new TestKit(getSystem());
544 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
546 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
548 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
550 kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
552 shardManager.tell(new ActorInitialized(), mockShardActor);
554 kit.expectNoMessage(Duration.ofMillis(200));
556 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
560 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
561 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
562 final TestKit kit = new TestKit(getSystem());
563 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
565 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
566 shardManager.tell(new ActorInitialized(), mockShardActor);
567 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
568 RaftState.Candidate.name()), mockShardActor);
570 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
572 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
574 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
578 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
579 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
580 final TestKit kit = new TestKit(getSystem());
581 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
583 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
584 shardManager.tell(new ActorInitialized(), mockShardActor);
585 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
586 RaftState.IsolatedLeader.name()), mockShardActor);
588 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
590 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
592 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
596 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
597 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
598 final TestKit kit = new TestKit(getSystem());
599 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
601 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
602 shardManager.tell(new ActorInitialized(), mockShardActor);
604 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
606 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
608 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
612 public void testOnReceiveFindPrimaryForRemoteShard() {
613 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
614 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
616 // Create an ActorSystem ShardManager actor for member-1.
618 final ActorSystem system1 = newActorSystem("Member1");
619 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
621 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
622 newTestShardMgrBuilderWithMockShardActor().cluster(
623 new ClusterWrapperImpl(system1)).props().withDispatcher(
624 Dispatchers.DefaultDispatcherId()), shardManagerID);
626 // Create an ActorSystem ShardManager actor for member-2.
628 final ActorSystem system2 = newActorSystem("Member2");
630 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
632 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
634 MockConfiguration mockConfig2 = new MockConfiguration(
635 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
636 .put("astronauts", Arrays.asList("member-2")).build());
638 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
639 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
640 new ClusterWrapperImpl(system2)).props().withDispatcher(
641 Dispatchers.DefaultDispatcherId()), shardManagerID);
643 final TestKit kit = new TestKit(system1);
644 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
645 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
647 shardManager2.tell(new ActorInitialized(), mockShardActor2);
649 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
650 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
651 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
653 shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
656 shardManager1.underlyingActor().waitForMemberUp();
657 shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
659 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
660 String path = found.getPrimaryPath();
661 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
662 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
664 shardManager2.underlyingActor().verifyFindPrimary();
666 // This part times out quite a bit on jenkins for some reason
668 // Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
670 // shardManager1.underlyingActor().waitForMemberRemoved();
672 // shardManager1.tell(new FindPrimary("astronauts", false), getRef());
674 // expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
676 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
680 public void testShardAvailabilityOnChangeOfMemberReachability() {
681 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
682 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
684 // Create an ActorSystem ShardManager actor for member-1.
686 final ActorSystem system1 = newActorSystem("Member1");
687 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
689 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
691 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
692 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
693 new ClusterWrapperImpl(system1)).props().withDispatcher(
694 Dispatchers.DefaultDispatcherId()), shardManagerID);
696 // Create an ActorSystem ShardManager actor for member-2.
698 final ActorSystem system2 = newActorSystem("Member2");
700 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
702 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
704 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
705 .put("default", Arrays.asList("member-1", "member-2")).build());
707 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
708 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
709 new ClusterWrapperImpl(system2)).props().withDispatcher(
710 Dispatchers.DefaultDispatcherId()), shardManagerID);
712 final TestKit kit = new TestKit(system1);
713 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
714 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
715 shardManager1.tell(new ActorInitialized(), mockShardActor1);
716 shardManager2.tell(new ActorInitialized(), mockShardActor2);
718 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
719 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
720 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
721 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
723 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
725 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
726 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
728 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
730 shardManager1.underlyingActor().waitForMemberUp();
732 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
734 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
735 String path = found.getPrimaryPath();
736 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
738 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
741 shardManager1.underlyingActor().waitForUnreachableMember();
743 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
744 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
745 MessageCollectorActor.clearMessages(mockShardActor1);
747 shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
750 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
752 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
754 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
756 shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
759 shardManager1.underlyingActor().waitForReachableMember();
761 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
762 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
763 MessageCollectorActor.clearMessages(mockShardActor1);
765 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
767 RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
768 String path1 = found1.getPrimaryPath();
769 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
771 shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
774 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
776 // Test FindPrimary wait succeeds after reachable member event.
778 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
779 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
780 shardManager1.underlyingActor().waitForUnreachableMember();
782 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
785 MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
787 RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
788 String path2 = found2.getPrimaryPath();
789 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
791 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
795 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
796 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
797 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
799 // Create an ActorSystem ShardManager actor for member-1.
801 final ActorSystem system1 = newActorSystem("Member1");
802 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
804 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
806 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
807 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
808 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
809 .primaryShardInfoCache(primaryShardInfoCache).props()
810 .withDispatcher(Dispatchers.DefaultDispatcherId()),
813 // Create an ActorSystem ShardManager actor for member-2.
815 final ActorSystem system2 = newActorSystem("Member2");
817 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
819 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
821 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
822 .put("default", Arrays.asList("member-1", "member-2")).build());
824 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
825 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
826 new ClusterWrapperImpl(system2)).props().withDispatcher(
827 Dispatchers.DefaultDispatcherId()), shardManagerID);
829 final TestKit kit = new TestKit(system1);
830 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
831 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
832 shardManager1.tell(new ActorInitialized(), mockShardActor1);
833 shardManager2.tell(new ActorInitialized(), mockShardActor2);
835 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
836 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
837 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
838 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
840 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
842 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
843 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
845 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
847 shardManager1.underlyingActor().waitForMemberUp();
849 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
851 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
852 String path = found.getPrimaryPath();
853 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
855 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
856 system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
858 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
859 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
861 shardManager1.underlyingActor().waitForUnreachableMember();
863 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
865 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
867 assertNull("Expected primaryShardInfoCache entry removed",
868 primaryShardInfoCache.getIfPresent("default"));
870 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
871 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
873 new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
876 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
878 LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
879 String path1 = found1.getPrimaryPath();
880 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
882 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
886 public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
887 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
888 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
890 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
891 .put("default", Arrays.asList("member-256", "member-2")).build());
893 // Create an ActorSystem, ShardManager and actor for member-256.
895 final ActorSystem system256 = newActorSystem("Member256");
896 // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
897 Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
899 final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
901 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
903 // ShardManager must be created with shard configuration to let its localShards has shards.
904 final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
905 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
906 .cluster(new ClusterWrapperImpl(system256))
907 .primaryShardInfoCache(primaryShardInfoCache).props()
908 .withDispatcher(Dispatchers.DefaultDispatcherId()),
911 // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
913 final ActorSystem system2 = newActorSystem("Member2");
915 // Join member-2 into the cluster of member-256.
916 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
918 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
920 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
921 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
922 new ClusterWrapperImpl(system2)).props().withDispatcher(
923 Dispatchers.DefaultDispatcherId()), shardManagerID);
925 final TestKit kit256 = new TestKit(system256);
926 shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
927 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
928 shardManager256.tell(new ActorInitialized(), mockShardActor256);
929 shardManager2.tell(new ActorInitialized(), mockShardActor2);
931 String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
932 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
933 shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
934 DataStoreVersions.CURRENT_VERSION), mockShardActor256);
935 shardManager256.tell(
936 new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
938 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
939 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
941 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
943 shardManager256.underlyingActor().waitForMemberUp();
945 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
947 LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
948 String path = found.getPrimaryPath();
949 assertTrue("Unexpected primary path " + path + " which must on member-256",
950 path.contains("member-256-shard-default-config"));
952 PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
953 system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
954 primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
956 // Simulate member-2 become unreachable.
957 shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
958 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
959 shardManager256.underlyingActor().waitForUnreachableMember();
961 // Make sure leader shard on member-256 is still leader and still in the cache.
962 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
963 found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
964 path = found.getPrimaryPath();
965 assertTrue("Unexpected primary path " + path + " which must still not on member-256",
966 path.contains("member-256-shard-default-config"));
967 Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
968 futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
970 public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
971 if (failure != null) {
972 assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
974 assertEquals("Expected primaryShardInfoCache entry",
975 primaryShardInfo, futurePrimaryShardInfo);
978 }, system256.dispatchers().defaultGlobalDispatcher());
980 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
984 public void testOnReceiveFindLocalShardForNonExistentShard() {
985 final TestKit kit = new TestKit(getSystem());
986 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
988 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
990 shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
992 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
994 assertEquals("getShardName", "non-existent", notFound.getShardName());
998 public void testOnReceiveFindLocalShardForExistentShard() {
999 final TestKit kit = new TestKit(getSystem());
1000 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1002 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1003 shardManager.tell(new ActorInitialized(), mockShardActor);
1005 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1007 LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1009 assertTrue("Found path contains " + found.getPath().path().toString(),
1010 found.getPath().path().toString().contains("member-1-shard-default-config"));
1014 public void testOnReceiveFindLocalShardForNotInitializedShard() {
1015 final TestKit kit = new TestKit(getSystem());
1016 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1018 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1020 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1024 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1025 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1026 final TestKit kit = new TestKit(getSystem());
1027 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1029 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1031 // We're passing waitUntilInitialized = true to FindLocalShard
1032 // so the response should be
1033 // delayed until we send ActorInitialized.
1034 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1035 new Timeout(5, TimeUnit.SECONDS));
1037 shardManager.tell(new ActorInitialized(), mockShardActor);
1039 Object resp = Await.result(future, kit.duration("5 seconds"));
1040 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1042 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1046 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1047 TestShardManager shardManager = newTestShardManager();
1049 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1050 shardManager.handleCommand(new RoleChangeNotification(
1051 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1053 verify(ready, never()).countDown();
1055 shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1056 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1058 verify(ready, times(1)).countDown();
1062 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1063 final TestKit kit = new TestKit(getSystem());
1064 TestShardManager shardManager = newTestShardManager();
1066 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1067 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1069 verify(ready, never()).countDown();
1071 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1073 shardManager.handleCommand(
1074 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1075 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1077 verify(ready, times(1)).countDown();
1081 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1082 final TestKit kit = new TestKit(getSystem());
1083 TestShardManager shardManager = newTestShardManager();
1085 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1086 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1088 verify(ready, never()).countDown();
1090 shardManager.handleCommand(
1091 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1092 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1094 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1096 verify(ready, times(1)).countDown();
1100 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1101 TestShardManager shardManager = newTestShardManager();
1103 shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1104 RaftState.Leader.name()));
1106 verify(ready, never()).countDown();
1110 public void testByDefaultSyncStatusIsFalse() {
1111 TestShardManager shardManager = newTestShardManager();
1113 assertFalse(shardManager.getMBean().getSyncStatus());
1117 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1118 TestShardManager shardManager = newTestShardManager();
1120 shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1121 RaftState.Follower.name(), RaftState.Leader.name()));
1123 assertTrue(shardManager.getMBean().getSyncStatus());
1127 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1128 TestShardManager shardManager = newTestShardManager();
1130 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1131 shardManager.handleCommand(new RoleChangeNotification(shardId,
1132 RaftState.Follower.name(), RaftState.Candidate.name()));
1134 assertFalse(shardManager.getMBean().getSyncStatus());
1136 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1137 shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1140 assertFalse(shardManager.getMBean().getSyncStatus());
1144 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1145 TestShardManager shardManager = newTestShardManager();
1147 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1148 shardManager.handleCommand(new RoleChangeNotification(shardId,
1149 RaftState.Candidate.name(), RaftState.Follower.name()));
1151 // Initially will be false
1152 assertFalse(shardManager.getMBean().getSyncStatus());
1154 // Send status true will make sync status true
1155 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1157 assertTrue(shardManager.getMBean().getSyncStatus());
1159 // Send status false will make sync status false
1160 shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1162 assertFalse(shardManager.getMBean().getSyncStatus());
1166 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1167 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1168 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1170 public List<String> getMemberShardNames(final MemberName memberName) {
1171 return Arrays.asList("default", "astronauts");
1175 // Initially will be false
1176 assertFalse(shardManager.getMBean().getSyncStatus());
1178 // Make default shard leader
1179 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1180 shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1181 RaftState.Follower.name(), RaftState.Leader.name()));
1183 // default = Leader, astronauts is unknown so sync status remains false
1184 assertFalse(shardManager.getMBean().getSyncStatus());
1186 // Make astronauts shard leader as well
1187 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1188 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1189 RaftState.Follower.name(), RaftState.Leader.name()));
1191 // Now sync status should be true
1192 assertTrue(shardManager.getMBean().getSyncStatus());
1194 // Make astronauts a Follower
1195 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1196 RaftState.Leader.name(), RaftState.Follower.name()));
1198 // Sync status is not true
1199 assertFalse(shardManager.getMBean().getSyncStatus());
1201 // Make the astronauts follower sync status true
1202 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1204 // Sync status is now true
1205 assertTrue(shardManager.getMBean().getSyncStatus());
1207 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1211 public void testOnReceiveSwitchShardBehavior() {
1212 final TestKit kit = new TestKit(getSystem());
1213 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1215 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1216 shardManager.tell(new ActorInitialized(), mockShardActor);
1218 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1220 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1221 SwitchBehavior.class);
1223 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1224 assertEquals(1000, switchBehavior.getNewTerm());
1227 private static List<MemberName> members(final String... names) {
1228 return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1232 public void testOnCreateShard() {
1233 LOG.info("testOnCreateShard starting");
1234 final TestKit kit = new TestKit(getSystem());
1235 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1237 ActorRef shardManager = actorFactory
1238 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1239 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1241 SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1242 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1244 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1245 .persistent(false).build();
1246 Shard.Builder shardBuilder = Shard.builder();
1248 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1249 "foo", null, members("member-1", "member-5", "member-6"));
1250 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1252 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1254 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1256 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1258 assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1259 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1260 .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1261 assertEquals("peerMembers", Sets.newHashSet(
1262 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1263 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1264 shardBuilder.getPeerAddresses().keySet());
1265 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1266 shardBuilder.getId());
1267 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1269 // Send CreateShard with same name - should return Success with
1272 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1274 Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1275 assertNotNull("Success status is null", success.status());
1277 LOG.info("testOnCreateShard ending");
1281 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1282 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1283 final TestKit kit = new TestKit(getSystem());
1284 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1286 ActorRef shardManager = actorFactory
1287 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1288 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1290 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1292 Shard.Builder shardBuilder = Shard.builder();
1293 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1294 "foo", null, members("member-5", "member-6"));
1296 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1297 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1299 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1300 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1302 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1303 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1304 .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1306 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1310 public void testOnCreateShardWithNoInitialSchemaContext() {
1311 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1312 final TestKit kit = new TestKit(getSystem());
1313 ActorRef shardManager = actorFactory
1314 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1315 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1317 Shard.Builder shardBuilder = Shard.builder();
1319 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1320 "foo", null, members("member-1"));
1321 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1323 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1325 SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1326 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1328 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1330 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1332 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1333 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1335 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1339 public void testGetSnapshot() {
1340 LOG.info("testGetSnapshot starting");
1341 TestKit kit = new TestKit(getSystem());
1343 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1344 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1345 .put("astronauts", Collections.<String>emptyList()).build());
1347 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1348 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1350 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1351 Failure failure = kit.expectMsgClass(Failure.class);
1352 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1354 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1356 waitForShardInitialized(shardManager, "shard1", kit);
1357 waitForShardInitialized(shardManager, "shard2", kit);
1359 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1361 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1363 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1364 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1366 Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
1368 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1369 datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet())));
1371 // Add a new replica
1373 TestKit mockShardLeaderKit = new TestKit(getSystem());
1375 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1376 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1378 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1379 mockShardLeaderKit.expectMsgClass(AddServer.class);
1380 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1381 kit.expectMsgClass(Status.Success.class);
1382 waitForShardInitialized(shardManager, "astronauts", kit);
1384 // Send another GetSnapshot and verify
1386 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1387 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1389 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1390 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1392 ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1393 assertNotNull("Expected ShardManagerSnapshot", snapshot);
1394 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1395 Sets.newHashSet(snapshot.getShardList()));
1397 LOG.info("testGetSnapshot ending");
1401 public void testRestoreFromSnapshot() {
1402 LOG.info("testRestoreFromSnapshot starting");
1404 datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1406 TestKit kit = new TestKit(getSystem());
1408 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1409 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1410 .put("astronauts", Collections.<String>emptyList()).build());
1412 ShardManagerSnapshot snapshot =
1413 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1414 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1415 Collections.<ShardSnapshot>emptyList());
1416 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1417 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1419 shardManager.underlyingActor().waitForRecoveryComplete();
1421 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1423 waitForShardInitialized(shardManager, "shard1", kit);
1424 waitForShardInitialized(shardManager, "shard2", kit);
1425 waitForShardInitialized(shardManager, "astronauts", kit);
1427 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1429 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1431 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1433 assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1434 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1435 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1437 LOG.info("testRestoreFromSnapshot ending");
1441 public void testAddShardReplicaForNonExistentShardConfig() {
1442 final TestKit kit = new TestKit(getSystem());
1443 ActorRef shardManager = actorFactory
1444 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1445 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1447 shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1448 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1450 assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1454 public void testAddShardReplica() {
1455 LOG.info("testAddShardReplica starting");
1456 MockConfiguration mockConfig = new MockConfiguration(
1457 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1458 .put("astronauts", Arrays.asList("member-2")).build());
1460 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1461 datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1463 // Create an ActorSystem ShardManager actor for member-1.
1464 final ActorSystem system1 = newActorSystem("Member1");
1465 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1466 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1467 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1468 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1469 .cluster(new ClusterWrapperImpl(system1)).props()
1470 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1473 // Create an ActorSystem ShardManager actor for member-2.
1474 final ActorSystem system2 = newActorSystem("Member2");
1475 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1477 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1478 String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1479 final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1480 Props.create(MockRespondActor.class, AddServer.class,
1481 new AddServerReply(ServerChangeStatus.OK, memberId2))
1482 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1484 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1485 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1486 .cluster(new ClusterWrapperImpl(system2)).props()
1487 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1490 final TestKit kit = new TestKit(getSystem());
1491 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1492 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1494 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1496 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1497 leaderShardManager.tell(
1498 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1499 mockShardLeaderActor);
1500 leaderShardManager.tell(
1501 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1502 mockShardLeaderActor);
1504 newReplicaShardManager.underlyingActor().waitForMemberUp();
1505 leaderShardManager.underlyingActor().waitForMemberUp();
1507 // Have a dummy snapshot to be overwritten by the new data
1509 String[] restoredShards = { "default", "people" };
1510 ShardManagerSnapshot snapshot =
1511 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1512 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1513 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1515 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1516 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1518 // construct a mock response message
1519 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1520 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1522 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1523 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1524 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1526 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1527 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1528 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1529 ShardManagerSnapshot.class);
1530 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1531 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1532 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1533 Sets.newHashSet(shardManagerSnapshot.getShardList()));
1534 LOG.info("testAddShardReplica ending");
1538 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1539 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1540 final TestKit kit = new TestKit(getSystem());
1541 TestActorRef<TestShardManager> shardManager = actorFactory
1542 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1544 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1545 shardManager.tell(new ActorInitialized(), mockShardActor);
1547 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1548 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1549 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1550 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1552 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1554 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1556 new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1559 new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1562 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1564 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1566 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1567 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1569 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1570 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1572 // Send message again to verify previous in progress state is
1575 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1576 resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1577 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1579 // Send message again with an AddServer timeout to verify the
1580 // pre-existing shard actor isn't terminated.
1583 newDatastoreContextFactory(
1584 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1585 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1586 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1587 kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1589 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1590 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1592 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1596 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1597 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1598 final TestKit kit = new TestKit(getSystem());
1599 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1600 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1602 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1603 shardManager.tell(new ActorInitialized(), mockShardActor);
1604 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1605 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1607 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1610 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1611 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1612 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1614 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1615 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1617 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1621 public void testAddShardReplicaWithAddServerReplyFailure() {
1622 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1623 final TestKit kit = new TestKit(getSystem());
1624 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1626 MockConfiguration mockConfig = new MockConfiguration(
1627 ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1629 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1630 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1631 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1632 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1633 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1635 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1637 TestKit terminateWatcher = new TestKit(getSystem());
1638 terminateWatcher.watch(mockNewReplicaShardActor);
1640 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1642 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1643 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1644 addServerMsg.getNewServerId());
1645 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1647 Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1648 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1650 shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1651 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1653 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1655 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1656 mockShardLeaderKit.expectMsgClass(AddServer.class);
1657 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1658 failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1659 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1661 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1665 public void testAddShardReplicaWithAlreadyInProgress() {
1666 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1667 AddServer.class, new AddShardReplica("astronauts"));
1671 public void testAddShardReplicaWithFindPrimaryTimeout() {
1672 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1673 datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1674 final TestKit kit = new TestKit(getSystem());
1675 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1677 final ActorRef newReplicaShardManager = actorFactory
1678 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1679 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1681 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1682 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1683 AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1685 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1686 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1687 assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1689 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1693 public void testRemoveShardReplicaForNonExistentShard() {
1694 final TestKit kit = new TestKit(getSystem());
1695 ActorRef shardManager = actorFactory
1696 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1697 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1699 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1700 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1701 assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1708 public void testRemoveShardReplicaLocal() {
1709 final TestKit kit = new TestKit(getSystem());
1710 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1712 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1713 RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1715 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1717 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1718 shardManager.tell(new ActorInitialized(), respondActor);
1719 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1720 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1722 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1725 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1726 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1727 RemoveServer.class);
1728 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1729 removeServer.getServerId());
1730 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1734 public void testRemoveShardReplicaRemote() {
1735 MockConfiguration mockConfig = new MockConfiguration(
1736 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1737 .put("astronauts", Arrays.asList("member-1")).build());
1739 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1741 // Create an ActorSystem ShardManager actor for member-1.
1742 final ActorSystem system1 = newActorSystem("Member1");
1743 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1744 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1746 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1747 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1748 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1751 // Create an ActorSystem ShardManager actor for member-2.
1752 final ActorSystem system2 = newActorSystem("Member2");
1753 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1755 String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1756 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1757 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1758 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1759 new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1761 LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1763 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1764 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1765 new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1768 // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1769 // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1770 // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1771 // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1773 // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1774 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1775 // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1777 // To work around this problem we create a ForwardingActor with the right address and pass to it the
1778 // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1779 // thing works as expected
1780 final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1781 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1782 "member-2-shard-default-" + shardMrgIDSuffix);
1784 LOG.error("Forwarding actor : {}", actorRef);
1786 final TestKit kit = new TestKit(getSystem());
1787 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1788 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1790 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1791 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1793 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1794 leaderShardManager.tell(
1795 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1796 mockShardLeaderActor);
1797 leaderShardManager.tell(
1798 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1799 mockShardLeaderActor);
1801 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1802 newReplicaShardManager.tell(
1803 new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1805 newReplicaShardManager.tell(
1806 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1809 newReplicaShardManager.underlyingActor().waitForMemberUp();
1810 leaderShardManager.underlyingActor().waitForMemberUp();
1812 // construct a mock response message
1813 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1814 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1815 RemoveServer.class);
1816 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1817 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1818 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1822 public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1823 testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1824 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1828 public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1829 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1830 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1834 public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1835 final Class<?> firstForwardedServerChangeClass,
1836 final Object secondServerChange) {
1837 final TestKit kit = new TestKit(getSystem());
1838 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1839 final TestKit secondRequestKit = new TestKit(getSystem());
1841 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1842 .put(shardName, Arrays.asList("member-2")).build());
1844 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1845 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1846 .cluster(new MockClusterWrapper()).props()
1847 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1850 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1852 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1854 shardManager.tell(firstServerChange, kit.getRef());
1856 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1858 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1860 secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1864 public void testServerRemovedShardActorNotRunning() {
1865 LOG.info("testServerRemovedShardActorNotRunning starting");
1866 final TestKit kit = new TestKit(getSystem());
1867 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1868 .put("default", Arrays.asList("member-1", "member-2"))
1869 .put("astronauts", Arrays.asList("member-2"))
1870 .put("people", Arrays.asList("member-1", "member-2")).build());
1872 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1873 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1875 shardManager.underlyingActor().waitForRecoveryComplete();
1876 shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1877 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1879 shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1880 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1882 // Removed the default shard replica from member-1
1883 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1884 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1886 shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1888 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1890 LOG.info("testServerRemovedShardActorNotRunning ending");
1894 public void testServerRemovedShardActorRunning() {
1895 LOG.info("testServerRemovedShardActorRunning starting");
1896 final TestKit kit = new TestKit(getSystem());
1897 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1898 .put("default", Arrays.asList("member-1", "member-2"))
1899 .put("astronauts", Arrays.asList("member-2"))
1900 .put("people", Arrays.asList("member-1", "member-2")).build());
1902 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1903 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1905 TestActorRef<TestShardManager> shardManager = actorFactory
1906 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1907 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1909 shardManager.underlyingActor().waitForRecoveryComplete();
1911 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1912 shardManager.tell(new ActorInitialized(), shard);
1914 waitForShardInitialized(shardManager, "people", kit);
1915 waitForShardInitialized(shardManager, "default", kit);
1917 // Removed the default shard replica from member-1
1918 shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1920 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1922 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1924 LOG.info("testServerRemovedShardActorRunning ending");
1928 public void testShardPersistenceWithRestoredData() {
1929 LOG.info("testShardPersistenceWithRestoredData starting");
1930 final TestKit kit = new TestKit(getSystem());
1931 MockConfiguration mockConfig =
1932 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1933 .put("default", Arrays.asList("member-1", "member-2"))
1934 .put("astronauts", Arrays.asList("member-2"))
1935 .put("people", Arrays.asList("member-1", "member-2")).build());
1936 String[] restoredShards = {"default", "astronauts"};
1937 ShardManagerSnapshot snapshot =
1938 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1939 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1941 // create shardManager to come up with restored data
1942 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1943 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1945 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1947 newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1948 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1949 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1951 // Verify a local shard is created for the restored shards,
1952 // although we expect a NotInitializedException for the shards
1953 // as the actor initialization
1954 // message is not sent for them
1955 newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1956 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1958 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1959 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1961 LOG.info("testShardPersistenceWithRestoredData ending");
1965 public void testShutDown() throws Exception {
1966 LOG.info("testShutDown starting");
1967 final TestKit kit = new TestKit(getSystem());
1968 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1969 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1971 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1972 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1974 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1975 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1977 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1978 .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1980 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1981 shardManager.tell(new ActorInitialized(), shard1);
1982 shardManager.tell(new ActorInitialized(), shard2);
1984 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1985 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1987 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1988 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1991 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1992 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1993 } catch (TimeoutException e) {
1997 actorFactory.killActor(shard1, kit);
1998 actorFactory.killActor(shard2, kit);
2000 Boolean stopped = Await.result(stopFuture, duration);
2001 assertEquals("Stopped", Boolean.TRUE, stopped);
2003 LOG.info("testShutDown ending");
2007 public void testChangeServersVotingStatus() {
2008 final TestKit kit = new TestKit(getSystem());
2009 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2011 ActorRef respondActor = actorFactory
2012 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2013 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2015 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2017 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2018 shardManager.tell(new ActorInitialized(), respondActor);
2019 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2020 DataStoreVersions.CURRENT_VERSION), kit.getRef());
2022 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2026 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2028 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2029 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2030 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2031 ImmutableMap.of(ShardIdentifier
2032 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2035 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2039 public void testChangeServersVotingStatusWithNoLeader() {
2040 final TestKit kit = new TestKit(getSystem());
2041 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2043 ActorRef respondActor = actorFactory
2044 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2045 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2047 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2049 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2050 shardManager.tell(new ActorInitialized(), respondActor);
2051 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2054 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2056 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2058 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2059 assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2062 @SuppressWarnings("unchecked")
2064 public void testRegisterForShardLeaderChanges() {
2065 LOG.info("testRegisterForShardLeaderChanges starting");
2067 final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2068 final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2069 final TestKit kit = new TestKit(getSystem());
2070 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2072 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2073 shardManager.tell(new ActorInitialized(), mockShardActor);
2075 final Consumer<String> mockCallback = mock(Consumer.class);
2076 shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2078 final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2079 final Registration reg = (Registration) reply.status();
2081 final DataTree mockDataTree = mock(DataTree.class);
2082 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2083 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2085 verify(mockCallback, timeout(5000)).accept("default");
2087 reset(mockCallback);
2088 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2089 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2091 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2092 verifyNoMoreInteractions(mockCallback);
2094 shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2095 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2097 verify(mockCallback, timeout(5000)).accept("default");
2099 reset(mockCallback);
2100 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2101 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2103 verify(mockCallback, timeout(5000)).accept("default");
2105 reset(mockCallback);
2108 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2109 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2111 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2112 verifyNoMoreInteractions(mockCallback);
2114 LOG.info("testRegisterForShardLeaderChanges ending");
2117 public static class TestShardManager extends ShardManager {
2118 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2119 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2120 private ShardManagerSnapshot snapshot;
2121 private final Map<String, ActorRef> shardActors;
2122 private final ActorRef shardActor;
2123 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2124 private CountDownLatch memberUpReceived = new CountDownLatch(1);
2125 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2126 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2127 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2128 private volatile MessageInterceptor messageInterceptor;
2130 TestShardManager(final Builder builder) {
2132 shardActor = builder.shardActor;
2133 shardActors = builder.shardActors;
2137 protected void handleRecover(final Object message) throws Exception {
2139 super.handleRecover(message);
2141 if (message instanceof RecoveryCompleted) {
2142 recoveryComplete.countDown();
2147 private void countDownIfOther(final Member member, final CountDownLatch latch) {
2148 if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2154 public void handleCommand(final Object message) throws Exception {
2156 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2157 getSender().tell(messageInterceptor.apply(message), getSelf());
2159 super.handleCommand(message);
2162 if (message instanceof FindPrimary) {
2163 findPrimaryMessageReceived.countDown();
2164 } else if (message instanceof ClusterEvent.MemberUp) {
2165 countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2166 } else if (message instanceof ClusterEvent.MemberRemoved) {
2167 countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2168 } else if (message instanceof ClusterEvent.UnreachableMember) {
2169 countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2170 } else if (message instanceof ClusterEvent.ReachableMember) {
2171 countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2176 void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2177 this.messageInterceptor = messageInterceptor;
2180 void waitForRecoveryComplete() {
2181 assertTrue("Recovery complete",
2182 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2185 public void waitForMemberUp() {
2186 assertTrue("MemberUp received",
2187 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2188 memberUpReceived = new CountDownLatch(1);
2191 void waitForMemberRemoved() {
2192 assertTrue("MemberRemoved received",
2193 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2194 memberRemovedReceived = new CountDownLatch(1);
2197 void waitForUnreachableMember() {
2198 assertTrue("UnreachableMember received",
2199 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2200 memberUnreachableReceived = new CountDownLatch(1);
2203 void waitForReachableMember() {
2204 assertTrue("ReachableMember received",
2205 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2206 memberReachableReceived = new CountDownLatch(1);
2209 void verifyFindPrimary() {
2210 assertTrue("FindPrimary received",
2211 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2212 findPrimaryMessageReceived = new CountDownLatch(1);
2215 public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2216 return new Builder(datastoreContextBuilder);
2219 public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2220 private ActorRef shardActor;
2221 private final Map<String, ActorRef> shardActors = new HashMap<>();
2223 Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2224 super(TestShardManager.class);
2225 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2228 Builder shardActor(final ActorRef newShardActor) {
2229 this.shardActor = newShardActor;
2233 Builder addShardActor(final String shardName, final ActorRef actorRef) {
2234 shardActors.put(shardName, actorRef);
2240 public void saveSnapshot(final Object obj) {
2241 snapshot = (ShardManagerSnapshot) obj;
2242 snapshotPersist.countDown();
2243 super.saveSnapshot(obj);
2246 void verifySnapshotPersisted(final Set<String> shardList) {
2247 assertTrue("saveSnapshot invoked",
2248 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2249 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2253 protected ActorRef newShardActor(final ShardInformation info) {
2254 if (shardActors.get(info.getShardName()) != null) {
2255 return shardActors.get(info.getShardName());
2258 if (shardActor != null) {
2262 return super.newShardActor(info);
2266 private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2267 extends AbstractShardManagerCreator<T> {
2268 private final Class<C> shardManagerClass;
2270 AbstractGenericCreator(final Class<C> shardManagerClass) {
2271 this.shardManagerClass = shardManagerClass;
2272 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2273 .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2277 public Props props() {
2279 return Props.create(shardManagerClass, this);
2283 private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2284 GenericCreator(final Class<C> shardManagerClass) {
2285 super(shardManagerClass);
2289 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2290 private static final long serialVersionUID = 1L;
2291 private final Creator<ShardManager> delegate;
2293 DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2294 this.delegate = delegate;
2298 public ShardManager create() throws Exception {
2299 return delegate.create();
2303 interface MessageInterceptor extends Function<Object, Object> {
2304 boolean canIntercept(Object message);
2307 private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2308 return new MessageInterceptor() {
2310 public Object apply(final Object message) {
2311 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2315 public boolean canIntercept(final Object message) {
2316 return message instanceof FindPrimary;
2321 private static class MockRespondActor extends MessageCollectorActor {
2322 static final String CLEAR_RESPONSE = "clear-response";
2324 private Object responseMsg;
2325 private final Class<?> requestClass;
2327 @SuppressWarnings("unused")
2328 MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2329 this.requestClass = requestClass;
2330 this.responseMsg = responseMsg;
2334 public void onReceive(final Object message) throws Exception {
2335 if (message.equals(CLEAR_RESPONSE)) {
2338 super.onReceive(message);
2339 if (message.getClass().equals(requestClass) && responseMsg != null) {
2340 getSender().tell(responseMsg, getSelf());