2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.anyString;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.times;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
27 import akka.actor.ActorRef;
28 import akka.actor.ActorSystem;
29 import akka.actor.AddressFromURIString;
30 import akka.actor.Props;
31 import akka.actor.Status;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.cluster.Cluster;
35 import akka.cluster.ClusterEvent;
36 import akka.cluster.Member;
37 import akka.dispatch.Dispatchers;
38 import akka.dispatch.OnComplete;
39 import akka.japi.Creator;
40 import akka.pattern.Patterns;
41 import akka.persistence.RecoveryCompleted;
42 import akka.serialization.Serialization;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.base.Stopwatch;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.Lists;
49 import com.google.common.collect.Sets;
50 import com.google.common.util.concurrent.Uninterruptibles;
52 import java.time.Duration;
53 import java.util.AbstractMap;
54 import java.util.Arrays;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
60 import java.util.Map.Entry;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import java.util.function.Consumer;
66 import java.util.function.Function;
67 import java.util.stream.Collectors;
68 import org.junit.AfterClass;
69 import org.junit.BeforeClass;
70 import org.junit.Test;
71 import org.opendaylight.controller.cluster.access.concepts.MemberName;
72 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
73 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
74 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
75 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
76 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
77 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
78 import org.opendaylight.controller.cluster.datastore.Shard;
79 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
80 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
81 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
82 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
83 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
84 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
85 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
86 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
87 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
88 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
89 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
90 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
91 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
92 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
93 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
94 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
95 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
96 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
97 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
98 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
99 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
100 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
101 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
102 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
103 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
104 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
105 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
106 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
107 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
108 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
109 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
110 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
111 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
112 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
113 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
114 import org.opendaylight.controller.cluster.raft.RaftState;
115 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
116 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
117 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
118 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
119 import org.opendaylight.controller.cluster.raft.messages.AddServer;
120 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
121 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
122 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
123 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
124 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
125 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
126 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
127 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
128 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
129 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
130 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
131 import org.opendaylight.yangtools.concepts.Registration;
132 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
133 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
134 import org.slf4j.Logger;
135 import org.slf4j.LoggerFactory;
136 import scala.concurrent.Await;
137 import scala.concurrent.Future;
138 import scala.concurrent.duration.FiniteDuration;
140 public class ShardManagerTest extends AbstractShardManagerTest {
141 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
142 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
143 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
145 private static SchemaContext TEST_SCHEMA_CONTEXT;
147 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
150 public static void beforeClass() {
151 TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
155 public static void afterClass() {
156 TEST_SCHEMA_CONTEXT = null;
159 private ActorSystem newActorSystem(final String config) {
160 return newActorSystem("cluster-test", config);
163 private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
164 String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
165 if (system == getSystem()) {
166 return actorFactory.createActor(MessageCollectorActor.props(), name);
169 return system.actorOf(MessageCollectorActor.props(), name);
172 private Props newShardMgrProps() {
173 return newShardMgrProps(new MockConfiguration());
176 private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
177 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
178 doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
179 doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
183 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
184 return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
187 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
188 return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
189 .distributedDataStore(mock(DistributedDataStore.class));
193 private Props newPropsShardMgrWithMockShardActor() {
194 return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
195 Dispatchers.DefaultDispatcherId());
198 private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
199 return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
200 .withDispatcher(Dispatchers.DefaultDispatcherId());
204 private TestShardManager newTestShardManager() {
205 return newTestShardManager(newShardMgrProps());
208 private TestShardManager newTestShardManager(final Props props) {
209 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
210 TestShardManager shardManager = shardManagerActor.underlyingActor();
211 shardManager.waitForRecoveryComplete();
215 private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
217 AssertionError last = null;
218 Stopwatch sw = Stopwatch.createStarted();
219 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
221 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
222 kit.expectMsgClass(LocalShardFound.class);
224 } catch (AssertionError e) {
228 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
234 @SuppressWarnings("unchecked")
235 private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
236 Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
237 if (reply instanceof Failure) {
238 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
245 public void testPerShardDatastoreContext() throws Exception {
246 LOG.info("testPerShardDatastoreContext starting");
247 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
248 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
251 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
252 .when(mockFactory).getShardDatastoreContext("default");
255 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
256 .when(mockFactory).getShardDatastoreContext("topology");
258 final MockConfiguration mockConfig = new MockConfiguration() {
260 public Collection<String> getMemberShardNames(final MemberName memberName) {
261 return Arrays.asList("default", "topology");
265 public Collection<MemberName> getMembersFromShardName(final String shardName) {
266 return members("member-1");
270 final ActorRef defaultShardActor = actorFactory.createActor(
271 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
272 final ActorRef topologyShardActor = actorFactory.createActor(
273 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
275 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
276 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
277 shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
278 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
280 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
281 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
282 class LocalShardManager extends ShardManager {
283 LocalShardManager(final AbstractShardManagerCreator<?> creator) {
288 protected ActorRef newShardActor(final ShardInformation info) {
289 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
292 ref = entry.getKey();
293 entry.setValue(info.getDatastoreContext());
296 newShardActorLatch.countDown();
301 final Creator<ShardManager> creator = new Creator<ShardManager>() {
302 private static final long serialVersionUID = 1L;
304 public ShardManager create() {
305 return new LocalShardManager(
306 new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
307 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
311 final TestKit kit = new TestKit(getSystem());
313 final ActorRef shardManager = actorFactory.createActor(Props.create(
314 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
316 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
318 assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
319 assertEquals("getShardElectionTimeoutFactor", 6,
320 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
321 assertEquals("getShardElectionTimeoutFactor", 7,
322 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
324 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
325 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
327 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
328 .when(newMockFactory).getShardDatastoreContext("default");
331 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
332 .when(newMockFactory).getShardDatastoreContext("topology");
334 shardManager.tell(newMockFactory, kit.getRef());
336 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
337 DatastoreContext.class);
338 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
340 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
341 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
343 LOG.info("testPerShardDatastoreContext ending");
347 public void testOnReceiveFindPrimaryForNonExistentShard() {
348 final TestKit kit = new TestKit(getSystem());
349 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
351 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
353 shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
355 kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
359 public void testOnReceiveFindPrimaryForLocalLeaderShard() {
360 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
361 final TestKit kit = new TestKit(getSystem());
362 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
364 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
366 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
367 shardManager.tell(new ActorInitialized(), mockShardActor);
369 DataTree mockDataTree = mock(DataTree.class);
370 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
371 DataStoreVersions.CURRENT_VERSION), kit.getRef());
373 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
375 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
378 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
380 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
381 LocalPrimaryShardFound.class);
382 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
383 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
384 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
386 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
390 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
391 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
392 final TestKit kit = new TestKit(getSystem());
393 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
395 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
396 shardManager.tell(new ActorInitialized(), mockShardActor);
398 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
399 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
401 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
403 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
406 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
408 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
410 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
414 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
415 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
416 final TestKit kit = new TestKit(getSystem());
417 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
419 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
420 shardManager.tell(new ActorInitialized(), mockShardActor);
422 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
423 MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
425 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
427 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
429 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
430 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
432 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
434 RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
435 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
436 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
437 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
439 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
443 public void testOnReceiveFindPrimaryForUninitializedShard() {
444 final TestKit kit = new TestKit(getSystem());
445 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
447 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
449 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
453 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
454 final TestKit kit = new TestKit(getSystem());
455 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
457 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
458 shardManager.tell(new ActorInitialized(), mockShardActor);
460 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
462 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
466 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
467 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
468 final TestKit kit = new TestKit(getSystem());
469 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
471 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
472 shardManager.tell(new ActorInitialized(), mockShardActor);
474 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
476 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
479 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
481 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
483 DataTree mockDataTree = mock(DataTree.class);
484 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
485 DataStoreVersions.CURRENT_VERSION), mockShardActor);
487 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
489 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
490 LocalPrimaryShardFound.class);
491 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
492 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
493 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
495 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
499 public void testOnReceiveFindPrimaryWaitForShardLeader() {
500 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
501 datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
502 final TestKit kit = new TestKit(getSystem());
503 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
505 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
507 // We're passing waitUntilInitialized = true to FindPrimary so
508 // the response should be
509 // delayed until we send ActorInitialized and
510 // RoleChangeNotification.
511 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
513 kit.expectNoMessage(Duration.ofMillis(150));
515 shardManager.tell(new ActorInitialized(), mockShardActor);
517 kit.expectNoMessage(Duration.ofMillis(150));
519 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
521 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
524 kit.expectNoMessage(Duration.ofMillis(150));
526 DataTree mockDataTree = mock(DataTree.class);
527 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
528 DataStoreVersions.CURRENT_VERSION), mockShardActor);
530 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
531 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
532 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
533 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
535 kit.expectNoMessage(Duration.ofMillis(200));
537 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
541 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
542 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
543 final TestKit kit = new TestKit(getSystem());
544 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
546 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
548 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
550 kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
552 shardManager.tell(new ActorInitialized(), mockShardActor);
554 kit.expectNoMessage(Duration.ofMillis(200));
556 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
560 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
561 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
562 final TestKit kit = new TestKit(getSystem());
563 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
565 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
566 shardManager.tell(new ActorInitialized(), mockShardActor);
567 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
568 RaftState.Candidate.name()), mockShardActor);
570 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
572 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
574 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
578 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
579 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
580 final TestKit kit = new TestKit(getSystem());
581 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
583 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
584 shardManager.tell(new ActorInitialized(), mockShardActor);
585 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
586 RaftState.IsolatedLeader.name()), mockShardActor);
588 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
590 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
592 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
596 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
597 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
598 final TestKit kit = new TestKit(getSystem());
599 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
601 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
602 shardManager.tell(new ActorInitialized(), mockShardActor);
604 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
606 kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
608 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
612 public void testOnReceiveFindPrimaryForRemoteShard() {
613 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
614 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
616 // Create an ActorSystem ShardManager actor for member-1.
618 final ActorSystem system1 = newActorSystem("Member1");
619 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
621 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
622 newTestShardMgrBuilderWithMockShardActor().cluster(
623 new ClusterWrapperImpl(system1)).props().withDispatcher(
624 Dispatchers.DefaultDispatcherId()), shardManagerID);
626 // Create an ActorSystem ShardManager actor for member-2.
628 final ActorSystem system2 = newActorSystem("Member2");
630 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
632 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
634 MockConfiguration mockConfig2 = new MockConfiguration(
635 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
636 .put("astronauts", Arrays.asList("member-2")).build());
638 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
639 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
640 new ClusterWrapperImpl(system2)).props().withDispatcher(
641 Dispatchers.DefaultDispatcherId()), shardManagerID);
643 final TestKit kit = new TestKit(system1);
644 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
645 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
647 shardManager2.tell(new ActorInitialized(), mockShardActor2);
649 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
650 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
651 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
653 shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
656 shardManager1.underlyingActor().waitForMemberUp();
657 shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
659 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
660 String path = found.getPrimaryPath();
661 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
662 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
664 shardManager2.underlyingActor().verifyFindPrimary();
666 // This part times out quite a bit on jenkins for some reason
668 // Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
670 // shardManager1.underlyingActor().waitForMemberRemoved();
672 // shardManager1.tell(new FindPrimary("astronauts", false), getRef());
674 // expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
676 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
680 public void testShardAvailabilityOnChangeOfMemberReachability() {
681 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
682 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
684 // Create an ActorSystem ShardManager actor for member-1.
686 final ActorSystem system1 = newActorSystem("Member1");
687 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
689 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
691 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
692 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
693 new ClusterWrapperImpl(system1)).props().withDispatcher(
694 Dispatchers.DefaultDispatcherId()), shardManagerID);
696 // Create an ActorSystem ShardManager actor for member-2.
698 final ActorSystem system2 = newActorSystem("Member2");
700 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
702 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
704 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
705 .put("default", Arrays.asList("member-1", "member-2")).build());
707 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
708 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
709 new ClusterWrapperImpl(system2)).props().withDispatcher(
710 Dispatchers.DefaultDispatcherId()), shardManagerID);
712 final TestKit kit = new TestKit(system1);
713 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
714 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
715 shardManager1.tell(new ActorInitialized(), mockShardActor1);
716 shardManager2.tell(new ActorInitialized(), mockShardActor2);
718 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
719 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
720 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
721 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
723 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
725 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
726 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
728 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
730 shardManager1.underlyingActor().waitForMemberUp();
732 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
734 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
735 String path = found.getPrimaryPath();
736 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
738 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
741 shardManager1.underlyingActor().waitForUnreachableMember();
743 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
744 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
745 MessageCollectorActor.clearMessages(mockShardActor1);
747 shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
750 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
752 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
754 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
756 shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
759 shardManager1.underlyingActor().waitForReachableMember();
761 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
762 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
763 MessageCollectorActor.clearMessages(mockShardActor1);
765 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
767 RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
768 String path1 = found1.getPrimaryPath();
769 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
771 shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
774 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
776 // Test FindPrimary wait succeeds after reachable member event.
778 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
779 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
780 shardManager1.underlyingActor().waitForUnreachableMember();
782 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
785 MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
787 RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
788 String path2 = found2.getPrimaryPath();
789 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
791 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
795 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
796 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
797 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
799 // Create an ActorSystem ShardManager actor for member-1.
801 final ActorSystem system1 = newActorSystem("Member1");
802 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
804 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
806 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
807 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
808 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
809 .primaryShardInfoCache(primaryShardInfoCache).props()
810 .withDispatcher(Dispatchers.DefaultDispatcherId()),
813 // Create an ActorSystem ShardManager actor for member-2.
815 final ActorSystem system2 = newActorSystem("Member2");
817 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
819 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
821 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
822 .put("default", Arrays.asList("member-1", "member-2")).build());
824 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
825 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
826 new ClusterWrapperImpl(system2)).props().withDispatcher(
827 Dispatchers.DefaultDispatcherId()), shardManagerID);
829 final TestKit kit = new TestKit(system1);
830 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
831 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
832 shardManager1.tell(new ActorInitialized(), mockShardActor1);
833 shardManager2.tell(new ActorInitialized(), mockShardActor2);
835 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
836 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
837 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
838 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
840 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
842 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
843 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
845 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
847 shardManager1.underlyingActor().waitForMemberUp();
849 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
851 RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
852 String path = found.getPrimaryPath();
853 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
855 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
856 system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
858 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
859 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
861 shardManager1.underlyingActor().waitForUnreachableMember();
863 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
865 kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
867 assertNull("Expected primaryShardInfoCache entry removed",
868 primaryShardInfoCache.getIfPresent("default"));
870 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
871 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
873 new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
876 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
878 LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
879 String path1 = found1.getPrimaryPath();
880 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
882 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
886 public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
887 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
888 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
890 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
891 .put("default", Arrays.asList("member-256", "member-2")).build());
893 // Create an ActorSystem, ShardManager and actor for member-256.
895 final ActorSystem system256 = newActorSystem("Member256");
896 // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
897 Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
899 final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
901 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
903 // ShardManager must be created with shard configuration to let its localShards has shards.
904 final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
905 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
906 .cluster(new ClusterWrapperImpl(system256))
907 .primaryShardInfoCache(primaryShardInfoCache).props()
908 .withDispatcher(Dispatchers.DefaultDispatcherId()),
911 // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
913 final ActorSystem system2 = newActorSystem("Member2");
915 // Join member-2 into the cluster of member-256.
916 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
918 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
920 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
921 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
922 new ClusterWrapperImpl(system2)).props().withDispatcher(
923 Dispatchers.DefaultDispatcherId()), shardManagerID);
925 final TestKit kit256 = new TestKit(system256);
926 shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
927 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
928 shardManager256.tell(new ActorInitialized(), mockShardActor256);
929 shardManager2.tell(new ActorInitialized(), mockShardActor2);
931 String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
932 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
933 shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
934 DataStoreVersions.CURRENT_VERSION), mockShardActor256);
935 shardManager256.tell(
936 new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
938 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
939 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
941 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
943 shardManager256.underlyingActor().waitForMemberUp();
945 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
947 LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
948 String path = found.getPrimaryPath();
949 assertTrue("Unexpected primary path " + path + " which must on member-256",
950 path.contains("member-256-shard-default-config"));
952 PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
953 system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
954 primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
956 // Simulate member-2 become unreachable.
957 shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
958 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
959 shardManager256.underlyingActor().waitForUnreachableMember();
961 // Make sure leader shard on member-256 is still leader and still in the cache.
962 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
963 found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
964 path = found.getPrimaryPath();
965 assertTrue("Unexpected primary path " + path + " which must still not on member-256",
966 path.contains("member-256-shard-default-config"));
967 Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
968 futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
970 public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
971 if (failure != null) {
972 assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
974 assertEquals("Expected primaryShardInfoCache entry",
975 primaryShardInfo, futurePrimaryShardInfo);
978 }, system256.dispatchers().defaultGlobalDispatcher());
980 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
984 public void testOnReceiveFindLocalShardForNonExistentShard() {
985 final TestKit kit = new TestKit(getSystem());
986 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
988 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
990 shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
992 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
994 assertEquals("getShardName", "non-existent", notFound.getShardName());
998 public void testOnReceiveFindLocalShardForExistentShard() {
999 final TestKit kit = new TestKit(getSystem());
1000 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1002 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1003 shardManager.tell(new ActorInitialized(), mockShardActor);
1005 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1007 LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1009 assertTrue("Found path contains " + found.getPath().path().toString(),
1010 found.getPath().path().toString().contains("member-1-shard-default-config"));
1014 public void testOnReceiveFindLocalShardForNotInitializedShard() {
1015 final TestKit kit = new TestKit(getSystem());
1016 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1018 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1020 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1024 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1025 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1026 final TestKit kit = new TestKit(getSystem());
1027 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1029 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1031 // We're passing waitUntilInitialized = true to FindLocalShard
1032 // so the response should be
1033 // delayed until we send ActorInitialized.
1034 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1035 new Timeout(5, TimeUnit.SECONDS));
1037 shardManager.tell(new ActorInitialized(), mockShardActor);
1039 Object resp = Await.result(future, kit.duration("5 seconds"));
1040 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1042 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1046 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1047 TestShardManager shardManager = newTestShardManager();
1049 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1050 shardManager.handleCommand(new RoleChangeNotification(
1051 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1053 verify(ready, never()).countDown();
1055 shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
1056 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1058 verify(ready, times(1)).countDown();
1062 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1063 final TestKit kit = new TestKit(getSystem());
1064 TestShardManager shardManager = newTestShardManager();
1066 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1067 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1069 verify(ready, never()).countDown();
1071 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1073 shardManager.handleCommand(
1074 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1075 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1077 verify(ready, times(1)).countDown();
1081 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1082 final TestKit kit = new TestKit(getSystem());
1083 TestShardManager shardManager = newTestShardManager();
1085 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1086 shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1088 verify(ready, never()).countDown();
1090 shardManager.handleCommand(
1091 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1092 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1094 shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1096 verify(ready, times(1)).countDown();
1100 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1101 TestShardManager shardManager = newTestShardManager();
1103 shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1104 RaftState.Leader.name()));
1106 verify(ready, never()).countDown();
1110 public void testByDefaultSyncStatusIsFalse() {
1111 TestShardManager shardManager = newTestShardManager();
1113 assertFalse(shardManager.getMBean().getSyncStatus());
1117 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1118 TestShardManager shardManager = newTestShardManager();
1120 shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1121 RaftState.Follower.name(), RaftState.Leader.name()));
1123 assertTrue(shardManager.getMBean().getSyncStatus());
1127 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1128 TestShardManager shardManager = newTestShardManager();
1130 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1131 shardManager.handleCommand(new RoleChangeNotification(shardId,
1132 RaftState.Follower.name(), RaftState.Candidate.name()));
1134 assertFalse(shardManager.getMBean().getSyncStatus());
1136 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1137 shardManager.handleCommand(new FollowerInitialSyncUpStatus(
1140 assertFalse(shardManager.getMBean().getSyncStatus());
1144 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1145 TestShardManager shardManager = newTestShardManager();
1147 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1148 shardManager.handleCommand(new RoleChangeNotification(shardId,
1149 RaftState.Candidate.name(), RaftState.Follower.name()));
1151 // Initially will be false
1152 assertFalse(shardManager.getMBean().getSyncStatus());
1154 // Send status true will make sync status true
1155 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
1157 assertTrue(shardManager.getMBean().getSyncStatus());
1159 // Send status false will make sync status false
1160 shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
1162 assertFalse(shardManager.getMBean().getSyncStatus());
1166 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1167 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1168 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1170 public List<String> getMemberShardNames(final MemberName memberName) {
1171 return Arrays.asList("default", "astronauts");
1175 // Initially will be false
1176 assertFalse(shardManager.getMBean().getSyncStatus());
1178 // Make default shard leader
1179 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1180 shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
1181 RaftState.Follower.name(), RaftState.Leader.name()));
1183 // default = Leader, astronauts is unknown so sync status remains false
1184 assertFalse(shardManager.getMBean().getSyncStatus());
1186 // Make astronauts shard leader as well
1187 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1188 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1189 RaftState.Follower.name(), RaftState.Leader.name()));
1191 // Now sync status should be true
1192 assertTrue(shardManager.getMBean().getSyncStatus());
1194 // Make astronauts a Follower
1195 shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
1196 RaftState.Leader.name(), RaftState.Follower.name()));
1198 // Sync status is not true
1199 assertFalse(shardManager.getMBean().getSyncStatus());
1201 // Make the astronauts follower sync status true
1202 shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1204 // Sync status is now true
1205 assertTrue(shardManager.getMBean().getSyncStatus());
1207 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1211 public void testOnReceiveSwitchShardBehavior() {
1212 final TestKit kit = new TestKit(getSystem());
1213 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1215 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1216 shardManager.tell(new ActorInitialized(), mockShardActor);
1218 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1220 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1221 SwitchBehavior.class);
1223 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1224 assertEquals(1000, switchBehavior.getNewTerm());
1227 private static List<MemberName> members(final String... names) {
1228 return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1232 public void testOnCreateShard() {
1233 LOG.info("testOnCreateShard starting");
1234 final TestKit kit = new TestKit(getSystem());
1235 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1237 ActorRef shardManager = actorFactory
1238 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1239 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1241 SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1242 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1244 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1245 .persistent(false).build();
1246 Shard.Builder shardBuilder = Shard.builder();
1248 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1249 "foo", null, members("member-1", "member-5", "member-6"));
1250 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1252 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1254 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1256 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1258 assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1259 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1260 .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1261 assertEquals("peerMembers", Sets.newHashSet(
1262 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1263 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1264 shardBuilder.getPeerAddresses().keySet());
1265 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1266 shardBuilder.getId());
1267 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1269 // Send CreateShard with same name - should return Success with
1272 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1274 Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1275 assertNotNull("Success status is null", success.status());
1277 LOG.info("testOnCreateShard ending");
1281 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1282 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1283 final TestKit kit = new TestKit(getSystem());
1284 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1286 ActorRef shardManager = actorFactory
1287 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1288 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1290 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1292 Shard.Builder shardBuilder = Shard.builder();
1293 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1294 "foo", null, members("member-5", "member-6"));
1296 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1297 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1299 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1300 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1302 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1303 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1304 .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1306 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1310 public void testOnCreateShardWithNoInitialSchemaContext() {
1311 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1312 final TestKit kit = new TestKit(getSystem());
1313 ActorRef shardManager = actorFactory
1314 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1315 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1317 Shard.Builder shardBuilder = Shard.builder();
1319 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1320 "foo", null, members("member-1"));
1321 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1323 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1325 SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1326 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1328 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1330 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1332 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1333 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1335 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1339 public void testGetSnapshot() {
1340 LOG.info("testGetSnapshot starting");
1341 TestKit kit = new TestKit(getSystem());
1343 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1344 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1345 .put("astronauts", Collections.<String>emptyList()).build());
1347 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1348 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1350 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1351 Failure failure = kit.expectMsgClass(Failure.class);
1352 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1354 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1356 waitForShardInitialized(shardManager, "shard1", kit);
1357 waitForShardInitialized(shardManager, "shard2", kit);
1359 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1361 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1363 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1364 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1366 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1367 datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
1369 // Add a new replica
1371 TestKit mockShardLeaderKit = new TestKit(getSystem());
1373 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1374 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1376 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1377 mockShardLeaderKit.expectMsgClass(AddServer.class);
1378 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1379 kit.expectMsgClass(Status.Success.class);
1380 waitForShardInitialized(shardManager, "astronauts", kit);
1382 // Send another GetSnapshot and verify
1384 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1385 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1387 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1388 Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
1390 ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1391 assertNotNull("Expected ShardManagerSnapshot", snapshot);
1392 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1393 Sets.newHashSet(snapshot.getShardList()));
1395 LOG.info("testGetSnapshot ending");
1399 public void testRestoreFromSnapshot() {
1400 LOG.info("testRestoreFromSnapshot starting");
1402 datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1404 TestKit kit = new TestKit(getSystem());
1406 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1407 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1408 .put("astronauts", Collections.<String>emptyList()).build());
1410 ShardManagerSnapshot snapshot =
1411 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1412 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1413 Collections.<ShardSnapshot>emptyList());
1414 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1415 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1417 shardManager.underlyingActor().waitForRecoveryComplete();
1419 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1421 waitForShardInitialized(shardManager, "shard1", kit);
1422 waitForShardInitialized(shardManager, "shard2", kit);
1423 waitForShardInitialized(shardManager, "astronauts", kit);
1425 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1427 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1429 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1431 assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1432 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1433 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1435 LOG.info("testRestoreFromSnapshot ending");
1439 public void testAddShardReplicaForNonExistentShardConfig() {
1440 final TestKit kit = new TestKit(getSystem());
1441 ActorRef shardManager = actorFactory
1442 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1443 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1445 shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1446 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
1448 assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1452 public void testAddShardReplica() {
1453 LOG.info("testAddShardReplica starting");
1454 MockConfiguration mockConfig = new MockConfiguration(
1455 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1456 .put("astronauts", Arrays.asList("member-2")).build());
1458 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1459 datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1461 // Create an ActorSystem ShardManager actor for member-1.
1462 final ActorSystem system1 = newActorSystem("Member1");
1463 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1464 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1465 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1466 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1467 .cluster(new ClusterWrapperImpl(system1)).props()
1468 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1471 // Create an ActorSystem ShardManager actor for member-2.
1472 final ActorSystem system2 = newActorSystem("Member2");
1473 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1475 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1476 String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1477 final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1478 Props.create(MockRespondActor.class, AddServer.class,
1479 new AddServerReply(ServerChangeStatus.OK, memberId2))
1480 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1482 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1483 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1484 .cluster(new ClusterWrapperImpl(system2)).props()
1485 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1488 final TestKit kit = new TestKit(getSystem());
1489 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1490 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1492 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1494 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1495 leaderShardManager.tell(
1496 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1497 mockShardLeaderActor);
1498 leaderShardManager.tell(
1499 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1500 mockShardLeaderActor);
1502 newReplicaShardManager.underlyingActor().waitForMemberUp();
1503 leaderShardManager.underlyingActor().waitForMemberUp();
1505 // Have a dummy snapshot to be overwritten by the new data
1507 String[] restoredShards = { "default", "people" };
1508 ShardManagerSnapshot snapshot =
1509 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1510 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1511 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1513 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1514 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1516 // construct a mock response message
1517 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1518 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1520 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1521 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1522 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1524 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1525 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1526 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1527 ShardManagerSnapshot.class);
1528 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1529 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1530 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1531 Sets.newHashSet(shardManagerSnapshot.getShardList()));
1532 LOG.info("testAddShardReplica ending");
1536 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1537 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1538 final TestKit kit = new TestKit(getSystem());
1539 TestActorRef<TestShardManager> shardManager = actorFactory
1540 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1542 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1543 shardManager.tell(new ActorInitialized(), mockShardActor);
1545 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1546 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1547 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1548 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1550 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1552 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1554 new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1557 new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1560 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1562 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1564 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1565 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1567 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1568 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1570 // Send message again to verify previous in progress state is
1573 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1574 resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1575 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1577 // Send message again with an AddServer timeout to verify the
1578 // pre-existing shard actor isn't terminated.
1581 newDatastoreContextFactory(
1582 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1583 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1584 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1585 kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1587 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1588 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1590 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1594 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1595 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1596 final TestKit kit = new TestKit(getSystem());
1597 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1598 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1600 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1601 shardManager.tell(new ActorInitialized(), mockShardActor);
1602 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1603 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1605 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1608 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1609 Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1610 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1612 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1613 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
1615 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1619 public void testAddShardReplicaWithAddServerReplyFailure() {
1620 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1621 final TestKit kit = new TestKit(getSystem());
1622 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1624 MockConfiguration mockConfig = new MockConfiguration(
1625 ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1627 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1628 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1629 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1630 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1631 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1633 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1635 TestKit terminateWatcher = new TestKit(getSystem());
1636 terminateWatcher.watch(mockNewReplicaShardActor);
1638 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1640 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1641 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1642 addServerMsg.getNewServerId());
1643 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1645 Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1646 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1648 shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1649 kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1651 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1653 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1654 mockShardLeaderKit.expectMsgClass(AddServer.class);
1655 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1656 failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1657 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1659 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1663 public void testAddShardReplicaWithAlreadyInProgress() {
1664 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1665 AddServer.class, new AddShardReplica("astronauts"));
1669 public void testAddShardReplicaWithFindPrimaryTimeout() {
1670 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1671 datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1672 final TestKit kit = new TestKit(getSystem());
1673 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1675 final ActorRef newReplicaShardManager = actorFactory
1676 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1677 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1679 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1680 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1681 AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1683 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1684 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
1685 assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1687 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1691 public void testRemoveShardReplicaForNonExistentShard() {
1692 final TestKit kit = new TestKit(getSystem());
1693 ActorRef shardManager = actorFactory
1694 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1695 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1697 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1698 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
1699 assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1706 public void testRemoveShardReplicaLocal() {
1707 final TestKit kit = new TestKit(getSystem());
1708 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1710 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1711 RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1713 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1715 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1716 shardManager.tell(new ActorInitialized(), respondActor);
1717 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1718 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1720 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1723 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1724 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1725 RemoveServer.class);
1726 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1727 removeServer.getServerId());
1728 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
1732 public void testRemoveShardReplicaRemote() {
1733 MockConfiguration mockConfig = new MockConfiguration(
1734 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1735 .put("astronauts", Arrays.asList("member-1")).build());
1737 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1739 // Create an ActorSystem ShardManager actor for member-1.
1740 final ActorSystem system1 = newActorSystem("Member1");
1741 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1742 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1744 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1745 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1746 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1749 // Create an ActorSystem ShardManager actor for member-2.
1750 final ActorSystem system2 = newActorSystem("Member2");
1751 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1753 String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1754 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1755 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1756 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1757 new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1759 LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1761 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1762 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1763 new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1766 // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1767 // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1768 // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1769 // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1771 // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1772 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1773 // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1775 // To work around this problem we create a ForwardingActor with the right address and pass to it the
1776 // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1777 // thing works as expected
1778 final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1779 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1780 "member-2-shard-default-" + shardMrgIDSuffix);
1782 LOG.error("Forwarding actor : {}", actorRef);
1784 final TestKit kit = new TestKit(getSystem());
1785 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1786 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1788 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1789 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1791 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1792 leaderShardManager.tell(
1793 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1794 mockShardLeaderActor);
1795 leaderShardManager.tell(
1796 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1797 mockShardLeaderActor);
1799 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1800 newReplicaShardManager.tell(
1801 new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1803 newReplicaShardManager.tell(
1804 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1807 newReplicaShardManager.underlyingActor().waitForMemberUp();
1808 leaderShardManager.underlyingActor().waitForMemberUp();
1810 // construct a mock response message
1811 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1812 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1813 RemoveServer.class);
1814 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1815 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1816 kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
1820 public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1821 testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1822 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1826 public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1827 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1828 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1832 public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1833 final Class<?> firstForwardedServerChangeClass,
1834 final Object secondServerChange) {
1835 final TestKit kit = new TestKit(getSystem());
1836 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1837 final TestKit secondRequestKit = new TestKit(getSystem());
1839 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1840 .put(shardName, Arrays.asList("member-2")).build());
1842 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1843 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1844 .cluster(new MockClusterWrapper()).props()
1845 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1848 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1850 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1852 shardManager.tell(firstServerChange, kit.getRef());
1854 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1856 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1858 secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
1862 public void testServerRemovedShardActorNotRunning() {
1863 LOG.info("testServerRemovedShardActorNotRunning starting");
1864 final TestKit kit = new TestKit(getSystem());
1865 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1866 .put("default", Arrays.asList("member-1", "member-2"))
1867 .put("astronauts", Arrays.asList("member-2"))
1868 .put("people", Arrays.asList("member-1", "member-2")).build());
1870 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1871 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1873 shardManager.underlyingActor().waitForRecoveryComplete();
1874 shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1875 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1877 shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1878 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1880 // Removed the default shard replica from member-1
1881 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1882 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1884 shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1886 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1888 LOG.info("testServerRemovedShardActorNotRunning ending");
1892 public void testServerRemovedShardActorRunning() {
1893 LOG.info("testServerRemovedShardActorRunning starting");
1894 final TestKit kit = new TestKit(getSystem());
1895 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1896 .put("default", Arrays.asList("member-1", "member-2"))
1897 .put("astronauts", Arrays.asList("member-2"))
1898 .put("people", Arrays.asList("member-1", "member-2")).build());
1900 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1901 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1903 TestActorRef<TestShardManager> shardManager = actorFactory
1904 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1905 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1907 shardManager.underlyingActor().waitForRecoveryComplete();
1909 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1910 shardManager.tell(new ActorInitialized(), shard);
1912 waitForShardInitialized(shardManager, "people", kit);
1913 waitForShardInitialized(shardManager, "default", kit);
1915 // Removed the default shard replica from member-1
1916 shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1918 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1920 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1922 LOG.info("testServerRemovedShardActorRunning ending");
1926 public void testShardPersistenceWithRestoredData() {
1927 LOG.info("testShardPersistenceWithRestoredData starting");
1928 final TestKit kit = new TestKit(getSystem());
1929 MockConfiguration mockConfig =
1930 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1931 .put("default", Arrays.asList("member-1", "member-2"))
1932 .put("astronauts", Arrays.asList("member-2"))
1933 .put("people", Arrays.asList("member-1", "member-2")).build());
1934 String[] restoredShards = {"default", "astronauts"};
1935 ShardManagerSnapshot snapshot =
1936 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1937 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1939 // create shardManager to come up with restored data
1940 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1941 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1943 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1945 newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1946 LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
1947 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1949 // Verify a local shard is created for the restored shards,
1950 // although we expect a NotInitializedException for the shards
1951 // as the actor initialization
1952 // message is not sent for them
1953 newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1954 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1956 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1957 kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
1959 LOG.info("testShardPersistenceWithRestoredData ending");
1963 public void testShutDown() throws Exception {
1964 LOG.info("testShutDown starting");
1965 final TestKit kit = new TestKit(getSystem());
1966 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1967 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1969 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1970 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1972 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1973 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1975 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1976 .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1978 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1979 shardManager.tell(new ActorInitialized(), shard1);
1980 shardManager.tell(new ActorInitialized(), shard2);
1982 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1983 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1985 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1986 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1989 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1990 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1991 } catch (TimeoutException e) {
1995 actorFactory.killActor(shard1, kit);
1996 actorFactory.killActor(shard2, kit);
1998 Boolean stopped = Await.result(stopFuture, duration);
1999 assertEquals("Stopped", Boolean.TRUE, stopped);
2001 LOG.info("testShutDown ending");
2005 public void testChangeServersVotingStatus() {
2006 final TestKit kit = new TestKit(getSystem());
2007 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2009 ActorRef respondActor = actorFactory
2010 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2011 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2013 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2015 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2016 shardManager.tell(new ActorInitialized(), respondActor);
2017 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2018 DataStoreVersions.CURRENT_VERSION), kit.getRef());
2020 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2024 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2026 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2027 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2028 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2029 ImmutableMap.of(ShardIdentifier
2030 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2033 kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2037 public void testChangeServersVotingStatusWithNoLeader() {
2038 final TestKit kit = new TestKit(getSystem());
2039 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2041 ActorRef respondActor = actorFactory
2042 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2043 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2045 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2047 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2048 shardManager.tell(new ActorInitialized(), respondActor);
2049 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2052 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2054 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2056 Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
2057 assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2060 @SuppressWarnings("unchecked")
2062 public void testRegisterForShardLeaderChanges() {
2063 LOG.info("testRegisterForShardLeaderChanges starting");
2065 final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
2066 final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
2067 final TestKit kit = new TestKit(getSystem());
2068 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
2070 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2071 shardManager.tell(new ActorInitialized(), mockShardActor);
2073 final Consumer<String> mockCallback = mock(Consumer.class);
2074 shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
2076 final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
2077 final Registration reg = (Registration) reply.status();
2079 final DataTree mockDataTree = mock(DataTree.class);
2080 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2081 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2083 verify(mockCallback, timeout(5000)).accept("default");
2085 reset(mockCallback);
2086 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2087 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2089 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2090 verifyNoMoreInteractions(mockCallback);
2092 shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
2093 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2095 verify(mockCallback, timeout(5000)).accept("default");
2097 reset(mockCallback);
2098 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
2099 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2101 verify(mockCallback, timeout(5000)).accept("default");
2103 reset(mockCallback);
2106 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
2107 DataStoreVersions.CURRENT_VERSION), mockShardActor);
2109 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
2110 verifyNoMoreInteractions(mockCallback);
2112 LOG.info("testRegisterForShardLeaderChanges ending");
2115 public static class TestShardManager extends ShardManager {
2116 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2117 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2118 private ShardManagerSnapshot snapshot;
2119 private final Map<String, ActorRef> shardActors;
2120 private final ActorRef shardActor;
2121 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2122 private CountDownLatch memberUpReceived = new CountDownLatch(1);
2123 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2124 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2125 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2126 private volatile MessageInterceptor messageInterceptor;
2128 TestShardManager(final Builder builder) {
2130 shardActor = builder.shardActor;
2131 shardActors = builder.shardActors;
2135 protected void handleRecover(final Object message) throws Exception {
2137 super.handleRecover(message);
2139 if (message instanceof RecoveryCompleted) {
2140 recoveryComplete.countDown();
2145 private void countDownIfOther(final Member member, final CountDownLatch latch) {
2146 if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2152 public void handleCommand(final Object message) throws Exception {
2154 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2155 getSender().tell(messageInterceptor.apply(message), getSelf());
2157 super.handleCommand(message);
2160 if (message instanceof FindPrimary) {
2161 findPrimaryMessageReceived.countDown();
2162 } else if (message instanceof ClusterEvent.MemberUp) {
2163 countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2164 } else if (message instanceof ClusterEvent.MemberRemoved) {
2165 countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2166 } else if (message instanceof ClusterEvent.UnreachableMember) {
2167 countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2168 } else if (message instanceof ClusterEvent.ReachableMember) {
2169 countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2174 void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2175 this.messageInterceptor = messageInterceptor;
2178 void waitForRecoveryComplete() {
2179 assertTrue("Recovery complete",
2180 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2183 public void waitForMemberUp() {
2184 assertTrue("MemberUp received",
2185 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2186 memberUpReceived = new CountDownLatch(1);
2189 void waitForMemberRemoved() {
2190 assertTrue("MemberRemoved received",
2191 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2192 memberRemovedReceived = new CountDownLatch(1);
2195 void waitForUnreachableMember() {
2196 assertTrue("UnreachableMember received",
2197 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2198 memberUnreachableReceived = new CountDownLatch(1);
2201 void waitForReachableMember() {
2202 assertTrue("ReachableMember received",
2203 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2204 memberReachableReceived = new CountDownLatch(1);
2207 void verifyFindPrimary() {
2208 assertTrue("FindPrimary received",
2209 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2210 findPrimaryMessageReceived = new CountDownLatch(1);
2213 public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2214 return new Builder(datastoreContextBuilder);
2217 public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2218 private ActorRef shardActor;
2219 private final Map<String, ActorRef> shardActors = new HashMap<>();
2221 Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2222 super(TestShardManager.class);
2223 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2226 Builder shardActor(final ActorRef newShardActor) {
2227 this.shardActor = newShardActor;
2231 Builder addShardActor(final String shardName, final ActorRef actorRef) {
2232 shardActors.put(shardName, actorRef);
2238 public void saveSnapshot(final Object obj) {
2239 snapshot = (ShardManagerSnapshot) obj;
2240 snapshotPersist.countDown();
2241 super.saveSnapshot(obj);
2244 void verifySnapshotPersisted(final Set<String> shardList) {
2245 assertTrue("saveSnapshot invoked",
2246 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2247 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2251 protected ActorRef newShardActor(final ShardInformation info) {
2252 if (shardActors.get(info.getShardName()) != null) {
2253 return shardActors.get(info.getShardName());
2256 if (shardActor != null) {
2260 return super.newShardActor(info);
2264 private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2265 extends AbstractShardManagerCreator<T> {
2266 private final Class<C> shardManagerClass;
2268 AbstractGenericCreator(final Class<C> shardManagerClass) {
2269 this.shardManagerClass = shardManagerClass;
2270 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2271 .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2275 public Props props() {
2277 return Props.create(shardManagerClass, this);
2281 private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2282 GenericCreator(final Class<C> shardManagerClass) {
2283 super(shardManagerClass);
2287 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2288 private static final long serialVersionUID = 1L;
2289 private final Creator<ShardManager> delegate;
2291 DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2292 this.delegate = delegate;
2296 public ShardManager create() throws Exception {
2297 return delegate.create();
2301 interface MessageInterceptor extends Function<Object, Object> {
2302 boolean canIntercept(Object message);
2305 private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2306 return new MessageInterceptor() {
2308 public Object apply(final Object message) {
2309 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2313 public boolean canIntercept(final Object message) {
2314 return message instanceof FindPrimary;
2319 private static class MockRespondActor extends MessageCollectorActor {
2320 static final String CLEAR_RESPONSE = "clear-response";
2322 private Object responseMsg;
2323 private final Class<?> requestClass;
2325 @SuppressWarnings("unused")
2326 MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2327 this.requestClass = requestClass;
2328 this.responseMsg = responseMsg;
2332 public void onReceive(final Object message) throws Exception {
2333 if (message.equals(CLEAR_RESPONSE)) {
2336 super.onReceive(message);
2337 if (message.getClass().equals(requestClass) && responseMsg != null) {
2338 getSender().tell(responseMsg, getSelf());