2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Matchers.anyString;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
24 import akka.actor.ActorRef;
25 import akka.actor.ActorSystem;
26 import akka.actor.AddressFromURIString;
27 import akka.actor.Props;
28 import akka.actor.Status;
29 import akka.actor.Status.Failure;
30 import akka.actor.Status.Success;
31 import akka.cluster.Cluster;
32 import akka.cluster.ClusterEvent;
33 import akka.cluster.Member;
34 import akka.dispatch.Dispatchers;
35 import akka.dispatch.OnComplete;
36 import akka.japi.Creator;
37 import akka.pattern.Patterns;
38 import akka.persistence.RecoveryCompleted;
39 import akka.serialization.Serialization;
40 import akka.testkit.TestActorRef;
41 import akka.testkit.javadsl.TestKit;
42 import akka.util.Timeout;
43 import com.google.common.base.Function;
44 import com.google.common.base.Stopwatch;
45 import com.google.common.collect.ImmutableMap;
46 import com.google.common.collect.Lists;
47 import com.google.common.collect.Sets;
48 import com.google.common.util.concurrent.Uninterruptibles;
50 import java.util.AbstractMap;
51 import java.util.Arrays;
52 import java.util.Collection;
53 import java.util.Collections;
54 import java.util.HashMap;
55 import java.util.List;
57 import java.util.Map.Entry;
59 import java.util.concurrent.CountDownLatch;
60 import java.util.concurrent.TimeUnit;
61 import java.util.concurrent.TimeoutException;
62 import java.util.stream.Collectors;
63 import org.junit.AfterClass;
64 import org.junit.BeforeClass;
65 import org.junit.Test;
66 import org.opendaylight.controller.cluster.access.concepts.MemberName;
67 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
68 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
69 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
70 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
71 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
72 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
73 import org.opendaylight.controller.cluster.datastore.Shard;
74 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
75 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
76 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
77 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
78 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
79 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
80 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
81 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
82 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
83 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
84 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
85 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
86 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
87 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
88 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
89 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
90 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
91 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
92 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
93 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
94 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
95 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
96 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
97 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
98 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
99 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
100 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
101 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
102 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
103 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
104 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
105 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
106 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
107 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
108 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
109 import org.opendaylight.controller.cluster.raft.RaftState;
110 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
111 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
112 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
113 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
114 import org.opendaylight.controller.cluster.raft.messages.AddServer;
115 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
116 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
117 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
118 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
119 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
120 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
121 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
122 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
123 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
124 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
125 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
126 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
127 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
128 import org.slf4j.Logger;
129 import org.slf4j.LoggerFactory;
130 import scala.concurrent.Await;
131 import scala.concurrent.Future;
132 import scala.concurrent.duration.FiniteDuration;
134 public class ShardManagerTest extends AbstractShardManagerTest {
135 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
136 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
137 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
139 private static SchemaContext TEST_SCHEMA_CONTEXT;
141 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
144 public static void beforeClass() {
145 TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
149 public static void afterClass() {
150 TEST_SCHEMA_CONTEXT = null;
153 private ActorSystem newActorSystem(final String config) {
154 return newActorSystem("cluster-test", config);
157 private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
158 String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
159 if (system == getSystem()) {
160 return actorFactory.createActor(MessageCollectorActor.props(), name);
163 return system.actorOf(MessageCollectorActor.props(), name);
166 private Props newShardMgrProps() {
167 return newShardMgrProps(new MockConfiguration());
170 private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
171 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
172 doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
173 doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
177 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
178 return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
181 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
182 return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
183 .distributedDataStore(mock(DistributedDataStore.class));
187 private Props newPropsShardMgrWithMockShardActor() {
188 return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
189 Dispatchers.DefaultDispatcherId());
192 private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
193 return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
194 .withDispatcher(Dispatchers.DefaultDispatcherId());
198 private TestShardManager newTestShardManager() {
199 return newTestShardManager(newShardMgrProps());
202 private TestShardManager newTestShardManager(final Props props) {
203 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
204 TestShardManager shardManager = shardManagerActor.underlyingActor();
205 shardManager.waitForRecoveryComplete();
209 private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
211 AssertionError last = null;
212 Stopwatch sw = Stopwatch.createStarted();
213 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
215 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
216 kit.expectMsgClass(LocalShardFound.class);
218 } catch (AssertionError e) {
222 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
228 @SuppressWarnings("unchecked")
229 private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
230 Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
231 if (reply instanceof Failure) {
232 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
239 public void testPerShardDatastoreContext() throws Exception {
240 LOG.info("testPerShardDatastoreContext starting");
241 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
242 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
245 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
246 .when(mockFactory).getShardDatastoreContext("default");
249 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
250 .when(mockFactory).getShardDatastoreContext("topology");
252 final MockConfiguration mockConfig = new MockConfiguration() {
254 public Collection<String> getMemberShardNames(final MemberName memberName) {
255 return Arrays.asList("default", "topology");
259 public Collection<MemberName> getMembersFromShardName(final String shardName) {
260 return members("member-1");
264 final ActorRef defaultShardActor = actorFactory.createActor(
265 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
266 final ActorRef topologyShardActor = actorFactory.createActor(
267 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
269 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
270 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
271 shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
272 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
274 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
275 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
276 class LocalShardManager extends ShardManager {
277 LocalShardManager(final AbstractShardManagerCreator<?> creator) {
282 protected ActorRef newShardActor(final ShardInformation info) {
283 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
286 ref = entry.getKey();
287 entry.setValue(info.getDatastoreContext());
290 newShardActorLatch.countDown();
295 final Creator<ShardManager> creator = new Creator<ShardManager>() {
296 private static final long serialVersionUID = 1L;
298 public ShardManager create() {
299 return new LocalShardManager(
300 new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
301 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
305 final TestKit kit = new TestKit(getSystem());
307 final ActorRef shardManager = actorFactory.createActor(Props.create(
308 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
310 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
312 assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
313 assertEquals("getShardElectionTimeoutFactor", 6,
314 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
315 assertEquals("getShardElectionTimeoutFactor", 7,
316 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
318 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
319 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
321 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
322 .when(newMockFactory).getShardDatastoreContext("default");
325 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
326 .when(newMockFactory).getShardDatastoreContext("topology");
328 shardManager.tell(newMockFactory, kit.getRef());
330 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
331 DatastoreContext.class);
332 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
334 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
335 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
337 LOG.info("testPerShardDatastoreContext ending");
341 public void testOnReceiveFindPrimaryForNonExistentShard() {
342 final TestKit kit = new TestKit(getSystem());
343 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
345 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
347 shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
349 kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class);
353 public void testOnReceiveFindPrimaryForLocalLeaderShard() {
354 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
355 final TestKit kit = new TestKit(getSystem());
356 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
358 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
360 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
361 shardManager.tell(new ActorInitialized(), mockShardActor);
363 DataTree mockDataTree = mock(DataTree.class);
364 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
365 DataStoreVersions.CURRENT_VERSION), kit.getRef());
367 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
369 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
372 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
374 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
375 LocalPrimaryShardFound.class);
376 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
377 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
378 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
380 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
384 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
385 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
386 final TestKit kit = new TestKit(getSystem());
387 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
389 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
390 shardManager.tell(new ActorInitialized(), mockShardActor);
392 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
393 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
395 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
397 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
400 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
402 kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
404 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
408 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
409 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
410 final TestKit kit = new TestKit(getSystem());
411 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
413 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
414 shardManager.tell(new ActorInitialized(), mockShardActor);
416 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
417 MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
419 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
421 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
423 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
424 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
426 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
428 RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
429 RemotePrimaryShardFound.class);
430 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
431 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
432 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
434 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
438 public void testOnReceiveFindPrimaryForUninitializedShard() {
439 final TestKit kit = new TestKit(getSystem());
440 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
442 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
444 kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
448 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
449 final TestKit kit = new TestKit(getSystem());
450 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
452 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
453 shardManager.tell(new ActorInitialized(), mockShardActor);
455 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
457 kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
461 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
462 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
463 final TestKit kit = new TestKit(getSystem());
464 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
466 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
467 shardManager.tell(new ActorInitialized(), mockShardActor);
469 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
471 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
474 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
476 kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
478 DataTree mockDataTree = mock(DataTree.class);
479 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
480 DataStoreVersions.CURRENT_VERSION), mockShardActor);
482 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
484 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
485 LocalPrimaryShardFound.class);
486 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
487 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
488 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
490 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
494 public void testOnReceiveFindPrimaryWaitForShardLeader() {
495 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
496 datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
497 final TestKit kit = new TestKit(getSystem());
498 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
500 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
502 // We're passing waitUntilInitialized = true to FindPrimary so
503 // the response should be
504 // delayed until we send ActorInitialized and
505 // RoleChangeNotification.
506 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
508 kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
510 shardManager.tell(new ActorInitialized(), mockShardActor);
512 kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
514 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
516 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
519 kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
521 DataTree mockDataTree = mock(DataTree.class);
522 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
523 DataStoreVersions.CURRENT_VERSION), mockShardActor);
525 LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
526 LocalPrimaryShardFound.class);
527 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
528 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
529 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
531 kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
533 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
537 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
538 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
539 final TestKit kit = new TestKit(getSystem());
540 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
542 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
544 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
546 kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class);
548 shardManager.tell(new ActorInitialized(), mockShardActor);
550 kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
552 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
556 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
557 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
558 final TestKit kit = new TestKit(getSystem());
559 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
561 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
562 shardManager.tell(new ActorInitialized(), mockShardActor);
563 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
564 RaftState.Candidate.name()), mockShardActor);
566 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
568 kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
570 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
574 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
575 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
576 final TestKit kit = new TestKit(getSystem());
577 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
579 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
580 shardManager.tell(new ActorInitialized(), mockShardActor);
581 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
582 RaftState.IsolatedLeader.name()), mockShardActor);
584 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
586 kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
588 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
592 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
593 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
594 final TestKit kit = new TestKit(getSystem());
595 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
597 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
598 shardManager.tell(new ActorInitialized(), mockShardActor);
600 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
602 kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
604 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
608 public void testOnReceiveFindPrimaryForRemoteShard() {
609 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
610 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
612 // Create an ActorSystem ShardManager actor for member-1.
614 final ActorSystem system1 = newActorSystem("Member1");
615 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
617 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
618 newTestShardMgrBuilderWithMockShardActor().cluster(
619 new ClusterWrapperImpl(system1)).props().withDispatcher(
620 Dispatchers.DefaultDispatcherId()), shardManagerID);
622 // Create an ActorSystem ShardManager actor for member-2.
624 final ActorSystem system2 = newActorSystem("Member2");
626 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
628 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
630 MockConfiguration mockConfig2 = new MockConfiguration(
631 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
632 .put("astronauts", Arrays.asList("member-2")).build());
634 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
635 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
636 new ClusterWrapperImpl(system2)).props().withDispatcher(
637 Dispatchers.DefaultDispatcherId()), shardManagerID);
639 final TestKit kit = new TestKit(system1);
640 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
641 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
643 shardManager2.tell(new ActorInitialized(), mockShardActor2);
645 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
646 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
647 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
649 shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
652 shardManager1.underlyingActor().waitForMemberUp();
653 shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
655 RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
656 String path = found.getPrimaryPath();
657 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
658 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
660 shardManager2.underlyingActor().verifyFindPrimary();
662 // This part times out quite a bit on jenkins for some reason
664 // Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
666 // shardManager1.underlyingActor().waitForMemberRemoved();
668 // shardManager1.tell(new FindPrimary("astronauts", false), getRef());
670 // expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
672 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
676 public void testShardAvailabilityOnChangeOfMemberReachability() {
677 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
678 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
680 // Create an ActorSystem ShardManager actor for member-1.
682 final ActorSystem system1 = newActorSystem("Member1");
683 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
685 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
687 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
688 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
689 new ClusterWrapperImpl(system1)).props().withDispatcher(
690 Dispatchers.DefaultDispatcherId()), shardManagerID);
692 // Create an ActorSystem ShardManager actor for member-2.
694 final ActorSystem system2 = newActorSystem("Member2");
696 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
698 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
700 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
701 .put("default", Arrays.asList("member-1", "member-2")).build());
703 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
704 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
705 new ClusterWrapperImpl(system2)).props().withDispatcher(
706 Dispatchers.DefaultDispatcherId()), shardManagerID);
708 final TestKit kit = new TestKit(system1);
709 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
710 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
711 shardManager1.tell(new ActorInitialized(), mockShardActor1);
712 shardManager2.tell(new ActorInitialized(), mockShardActor2);
714 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
715 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
716 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
717 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
719 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
721 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
722 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
724 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
726 shardManager1.underlyingActor().waitForMemberUp();
728 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
730 RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
731 String path = found.getPrimaryPath();
732 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
734 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
737 shardManager1.underlyingActor().waitForUnreachableMember();
739 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
740 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
741 MessageCollectorActor.clearMessages(mockShardActor1);
743 shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
746 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
748 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
750 kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
752 shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
755 shardManager1.underlyingActor().waitForReachableMember();
757 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
758 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
759 MessageCollectorActor.clearMessages(mockShardActor1);
761 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
763 RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
764 String path1 = found1.getPrimaryPath();
765 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
767 shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
770 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
772 // Test FindPrimary wait succeeds after reachable member event.
774 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
775 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
776 shardManager1.underlyingActor().waitForUnreachableMember();
778 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
781 MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
783 RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
784 String path2 = found2.getPrimaryPath();
785 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
787 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
791 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
792 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
793 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
795 // Create an ActorSystem ShardManager actor for member-1.
797 final ActorSystem system1 = newActorSystem("Member1");
798 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
800 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
802 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
803 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
804 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
805 .primaryShardInfoCache(primaryShardInfoCache).props()
806 .withDispatcher(Dispatchers.DefaultDispatcherId()),
809 // Create an ActorSystem ShardManager actor for member-2.
811 final ActorSystem system2 = newActorSystem("Member2");
813 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
815 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
817 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
818 .put("default", Arrays.asList("member-1", "member-2")).build());
820 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
821 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
822 new ClusterWrapperImpl(system2)).props().withDispatcher(
823 Dispatchers.DefaultDispatcherId()), shardManagerID);
825 final TestKit kit = new TestKit(system1);
826 shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
827 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
828 shardManager1.tell(new ActorInitialized(), mockShardActor1);
829 shardManager2.tell(new ActorInitialized(), mockShardActor2);
831 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
832 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
833 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
834 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
836 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
838 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
839 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
841 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
843 shardManager1.underlyingActor().waitForMemberUp();
845 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
847 RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
848 String path = found.getPrimaryPath();
849 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
851 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
852 system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
854 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
855 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
857 shardManager1.underlyingActor().waitForUnreachableMember();
859 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
861 kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
863 assertNull("Expected primaryShardInfoCache entry removed",
864 primaryShardInfoCache.getIfPresent("default"));
866 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
867 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
869 new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
872 shardManager1.tell(new FindPrimary("default", true), kit.getRef());
874 LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class);
875 String path1 = found1.getPrimaryPath();
876 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
878 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
882 public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
883 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
884 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
886 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
887 .put("default", Arrays.asList("member-256", "member-2")).build());
889 // Create an ActorSystem, ShardManager and actor for member-256.
891 final ActorSystem system256 = newActorSystem("Member256");
892 // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
893 Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
895 final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
897 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
899 // ShardManager must be created with shard configuration to let its localShards has shards.
900 final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
901 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
902 .cluster(new ClusterWrapperImpl(system256))
903 .primaryShardInfoCache(primaryShardInfoCache).props()
904 .withDispatcher(Dispatchers.DefaultDispatcherId()),
907 // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
909 final ActorSystem system2 = newActorSystem("Member2");
911 // Join member-2 into the cluster of member-256.
912 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
914 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
916 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
917 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
918 new ClusterWrapperImpl(system2)).props().withDispatcher(
919 Dispatchers.DefaultDispatcherId()), shardManagerID);
921 final TestKit kit256 = new TestKit(system256);
922 shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
923 shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
924 shardManager256.tell(new ActorInitialized(), mockShardActor256);
925 shardManager2.tell(new ActorInitialized(), mockShardActor2);
927 String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
928 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
929 shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
930 DataStoreVersions.CURRENT_VERSION), mockShardActor256);
931 shardManager256.tell(
932 new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
934 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
935 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
937 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
939 shardManager256.underlyingActor().waitForMemberUp();
941 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
943 LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"),
944 LocalPrimaryShardFound.class);
945 String path = found.getPrimaryPath();
946 assertTrue("Unexpected primary path " + path + " which must on member-256",
947 path.contains("member-256-shard-default-config"));
949 PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
950 system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
951 primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
953 // Simulate member-2 become unreachable.
954 shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
955 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
956 shardManager256.underlyingActor().waitForUnreachableMember();
958 // Make sure leader shard on member-256 is still leader and still in the cache.
959 shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
960 found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class);
961 path = found.getPrimaryPath();
962 assertTrue("Unexpected primary path " + path + " which must still not on member-256",
963 path.contains("member-256-shard-default-config"));
964 Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
965 futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
967 public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
968 if (failure != null) {
969 assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
971 assertEquals("Expected primaryShardInfoCache entry",
972 primaryShardInfo, futurePrimaryShardInfo);
975 }, system256.dispatchers().defaultGlobalDispatcher());
977 LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
981 public void testOnReceiveFindLocalShardForNonExistentShard() {
982 final TestKit kit = new TestKit(getSystem());
983 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
985 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
987 shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
989 LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
991 assertEquals("getShardName", "non-existent", notFound.getShardName());
995 public void testOnReceiveFindLocalShardForExistentShard() {
996 final TestKit kit = new TestKit(getSystem());
997 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
999 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1000 shardManager.tell(new ActorInitialized(), mockShardActor);
1002 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1004 LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1006 assertTrue("Found path contains " + found.getPath().path().toString(),
1007 found.getPath().path().toString().contains("member-1-shard-default-config"));
1011 public void testOnReceiveFindLocalShardForNotInitializedShard() {
1012 final TestKit kit = new TestKit(getSystem());
1013 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1015 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1017 kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1021 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1022 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1023 final TestKit kit = new TestKit(getSystem());
1024 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1026 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1028 // We're passing waitUntilInitialized = true to FindLocalShard
1029 // so the response should be
1030 // delayed until we send ActorInitialized.
1031 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1032 new Timeout(5, TimeUnit.SECONDS));
1034 shardManager.tell(new ActorInitialized(), mockShardActor);
1036 Object resp = Await.result(future, kit.duration("5 seconds"));
1037 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1039 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1043 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1044 TestShardManager shardManager = newTestShardManager();
1046 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1047 shardManager.onReceiveCommand(new RoleChangeNotification(
1048 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1050 verify(ready, never()).countDown();
1052 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1053 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1055 verify(ready, times(1)).countDown();
1059 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1060 final TestKit kit = new TestKit(getSystem());
1061 TestShardManager shardManager = newTestShardManager();
1063 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1064 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1066 verify(ready, never()).countDown();
1068 shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1070 shardManager.onReceiveCommand(
1071 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1072 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1074 verify(ready, times(1)).countDown();
1078 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1079 final TestKit kit = new TestKit(getSystem());
1080 TestShardManager shardManager = newTestShardManager();
1082 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1083 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1085 verify(ready, never()).countDown();
1087 shardManager.onReceiveCommand(
1088 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1089 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1091 shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1093 verify(ready, times(1)).countDown();
1097 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1098 TestShardManager shardManager = newTestShardManager();
1100 shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1101 RaftState.Leader.name()));
1103 verify(ready, never()).countDown();
1107 public void testByDefaultSyncStatusIsFalse() {
1108 TestShardManager shardManager = newTestShardManager();
1110 assertFalse(shardManager.getMBean().getSyncStatus());
1114 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1115 TestShardManager shardManager = newTestShardManager();
1117 shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1118 RaftState.Follower.name(), RaftState.Leader.name()));
1120 assertTrue(shardManager.getMBean().getSyncStatus());
1124 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1125 TestShardManager shardManager = newTestShardManager();
1127 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1128 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1129 RaftState.Follower.name(), RaftState.Candidate.name()));
1131 assertFalse(shardManager.getMBean().getSyncStatus());
1133 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1134 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1137 assertFalse(shardManager.getMBean().getSyncStatus());
1141 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1142 TestShardManager shardManager = newTestShardManager();
1144 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1145 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1146 RaftState.Candidate.name(), RaftState.Follower.name()));
1148 // Initially will be false
1149 assertFalse(shardManager.getMBean().getSyncStatus());
1151 // Send status true will make sync status true
1152 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1154 assertTrue(shardManager.getMBean().getSyncStatus());
1156 // Send status false will make sync status false
1157 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1159 assertFalse(shardManager.getMBean().getSyncStatus());
1163 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1164 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1165 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1167 public List<String> getMemberShardNames(final MemberName memberName) {
1168 return Arrays.asList("default", "astronauts");
1172 // Initially will be false
1173 assertFalse(shardManager.getMBean().getSyncStatus());
1175 // Make default shard leader
1176 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1177 shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1178 RaftState.Follower.name(), RaftState.Leader.name()));
1180 // default = Leader, astronauts is unknown so sync status remains false
1181 assertFalse(shardManager.getMBean().getSyncStatus());
1183 // Make astronauts shard leader as well
1184 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1185 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1186 RaftState.Follower.name(), RaftState.Leader.name()));
1188 // Now sync status should be true
1189 assertTrue(shardManager.getMBean().getSyncStatus());
1191 // Make astronauts a Follower
1192 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1193 RaftState.Leader.name(), RaftState.Follower.name()));
1195 // Sync status is not true
1196 assertFalse(shardManager.getMBean().getSyncStatus());
1198 // Make the astronauts follower sync status true
1199 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1201 // Sync status is now true
1202 assertTrue(shardManager.getMBean().getSyncStatus());
1204 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1208 public void testOnReceiveSwitchShardBehavior() {
1209 final TestKit kit = new TestKit(getSystem());
1210 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1212 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1213 shardManager.tell(new ActorInitialized(), mockShardActor);
1215 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1217 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1218 SwitchBehavior.class);
1220 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1221 assertEquals(1000, switchBehavior.getNewTerm());
1224 private static List<MemberName> members(final String... names) {
1225 return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1229 public void testOnCreateShard() {
1230 LOG.info("testOnCreateShard starting");
1231 final TestKit kit = new TestKit(getSystem());
1232 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1234 ActorRef shardManager = actorFactory
1235 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1236 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1238 SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1239 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1241 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1242 .persistent(false).build();
1243 Shard.Builder shardBuilder = Shard.builder();
1245 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1246 "foo", null, members("member-1", "member-5", "member-6"));
1247 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1249 kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1251 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1253 kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1255 assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
1256 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1257 .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1258 assertEquals("peerMembers", Sets.newHashSet(
1259 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1260 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1261 shardBuilder.getPeerAddresses().keySet());
1262 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1263 shardBuilder.getId());
1264 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1266 // Send CreateShard with same name - should return Success with
1269 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1271 Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1272 assertNotNull("Success status is null", success.status());
1274 LOG.info("testOnCreateShard ending");
1278 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1279 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1280 final TestKit kit = new TestKit(getSystem());
1281 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1283 ActorRef shardManager = actorFactory
1284 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1285 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1287 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1289 Shard.Builder shardBuilder = Shard.builder();
1290 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1291 "foo", null, members("member-5", "member-6"));
1293 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1294 kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1296 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1297 kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1299 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1300 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1301 .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1303 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1307 public void testOnCreateShardWithNoInitialSchemaContext() {
1308 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1309 final TestKit kit = new TestKit(getSystem());
1310 ActorRef shardManager = actorFactory
1311 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1312 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1314 Shard.Builder shardBuilder = Shard.builder();
1316 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1317 "foo", null, members("member-1"));
1318 shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1320 kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1322 SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1323 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1325 shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1327 kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1329 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1330 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1332 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1336 public void testGetSnapshot() {
1337 LOG.info("testGetSnapshot starting");
1338 TestKit kit = new TestKit(getSystem());
1340 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1341 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1342 .put("astronauts", Collections.<String>emptyList()).build());
1344 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1345 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1347 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1348 Failure failure = kit.expectMsgClass(Failure.class);
1349 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1351 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1353 waitForShardInitialized(shardManager, "shard1", kit);
1354 waitForShardInitialized(shardManager, "shard2", kit);
1356 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1358 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1360 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1361 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1363 Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
1365 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1366 datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet())));
1368 // Add a new replica
1370 TestKit mockShardLeaderKit = new TestKit(getSystem());
1372 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1373 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1375 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1376 mockShardLeaderKit.expectMsgClass(AddServer.class);
1377 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1378 kit.expectMsgClass(Status.Success.class);
1379 waitForShardInitialized(shardManager, "astronauts", kit);
1381 // Send another GetSnapshot and verify
1383 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1384 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1386 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1387 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1389 ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1390 assertNotNull("Expected ShardManagerSnapshot", snapshot);
1391 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1392 Sets.newHashSet(snapshot.getShardList()));
1394 LOG.info("testGetSnapshot ending");
1398 public void testRestoreFromSnapshot() {
1399 LOG.info("testRestoreFromSnapshot starting");
1401 datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1403 TestKit kit = new TestKit(getSystem());
1405 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1406 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1407 .put("astronauts", Collections.<String>emptyList()).build());
1409 ShardManagerSnapshot snapshot =
1410 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1411 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1412 Collections.<ShardSnapshot>emptyList());
1413 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1414 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1416 shardManager.underlyingActor().waitForRecoveryComplete();
1418 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1420 waitForShardInitialized(shardManager, "shard1", kit);
1421 waitForShardInitialized(shardManager, "shard2", kit);
1422 waitForShardInitialized(shardManager, "astronauts", kit);
1424 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1426 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1428 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1430 assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1431 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1432 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1434 LOG.info("testRestoreFromSnapshot ending");
1438 public void testAddShardReplicaForNonExistentShardConfig() {
1439 final TestKit kit = new TestKit(getSystem());
1440 ActorRef shardManager = actorFactory
1441 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1442 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1444 shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1445 Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class);
1447 assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
1451 public void testAddShardReplica() {
1452 LOG.info("testAddShardReplica starting");
1453 MockConfiguration mockConfig = new MockConfiguration(
1454 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1455 .put("astronauts", Arrays.asList("member-2")).build());
1457 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1458 datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1460 // Create an ActorSystem ShardManager actor for member-1.
1461 final ActorSystem system1 = newActorSystem("Member1");
1462 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1463 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1464 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1465 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1466 .cluster(new ClusterWrapperImpl(system1)).props()
1467 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1470 // Create an ActorSystem ShardManager actor for member-2.
1471 final ActorSystem system2 = newActorSystem("Member2");
1472 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1474 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1475 String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1476 final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1477 Props.create(MockRespondActor.class, AddServer.class,
1478 new AddServerReply(ServerChangeStatus.OK, memberId2))
1479 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1481 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1482 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1483 .cluster(new ClusterWrapperImpl(system2)).props()
1484 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1487 final TestKit kit = new TestKit(getSystem());
1488 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1489 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1491 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1493 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1494 leaderShardManager.tell(
1495 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1496 mockShardLeaderActor);
1497 leaderShardManager.tell(
1498 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1499 mockShardLeaderActor);
1501 newReplicaShardManager.underlyingActor().waitForMemberUp();
1502 leaderShardManager.underlyingActor().waitForMemberUp();
1504 // Have a dummy snapshot to be overwritten by the new data
1506 String[] restoredShards = { "default", "people" };
1507 ShardManagerSnapshot snapshot =
1508 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1509 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1510 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1512 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1513 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1515 // construct a mock response message
1516 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1517 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1519 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1520 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1521 kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
1523 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1524 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1525 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1526 ShardManagerSnapshot.class);
1527 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1528 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1529 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1530 Sets.newHashSet(shardManagerSnapshot.getShardList()));
1531 LOG.info("testAddShardReplica ending");
1535 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1536 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1537 final TestKit kit = new TestKit(getSystem());
1538 TestActorRef<TestShardManager> shardManager = actorFactory
1539 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1541 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1542 shardManager.tell(new ActorInitialized(), mockShardActor);
1544 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1545 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1546 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1547 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1549 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1551 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1553 new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1556 new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1559 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1561 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1563 Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1564 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1566 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1567 kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1569 // Send message again to verify previous in progress state is
1572 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1573 resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1574 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1576 // Send message again with an AddServer timeout to verify the
1577 // pre-existing shard actor isn't terminated.
1580 newDatastoreContextFactory(
1581 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1582 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1583 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1584 kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1586 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1587 kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1589 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1593 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1594 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1595 final TestKit kit = new TestKit(getSystem());
1596 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1597 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1599 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1600 shardManager.tell(new ActorInitialized(), mockShardActor);
1601 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1602 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1604 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1607 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1608 Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1609 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1611 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1612 kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1614 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1618 public void testAddShardReplicaWithAddServerReplyFailure() {
1619 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1620 final TestKit kit = new TestKit(getSystem());
1621 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1623 MockConfiguration mockConfig = new MockConfiguration(
1624 ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1626 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1627 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1628 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1629 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1630 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1632 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1634 TestKit terminateWatcher = new TestKit(getSystem());
1635 terminateWatcher.watch(mockNewReplicaShardActor);
1637 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1639 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1640 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1641 addServerMsg.getNewServerId());
1642 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1644 Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1645 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1647 shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1648 kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
1650 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1652 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1653 mockShardLeaderKit.expectMsgClass(AddServer.class);
1654 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1655 failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1656 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1658 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1662 public void testAddShardReplicaWithAlreadyInProgress() {
1663 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1664 AddServer.class, new AddShardReplica("astronauts"));
1668 public void testAddShardReplicaWithFindPrimaryTimeout() {
1669 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1670 datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1671 final TestKit kit = new TestKit(getSystem());
1672 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1674 final ActorRef newReplicaShardManager = actorFactory
1675 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1676 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1678 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1679 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1680 AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1682 newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1683 Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
1684 assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
1686 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1690 public void testRemoveShardReplicaForNonExistentShard() {
1691 final TestKit kit = new TestKit(getSystem());
1692 ActorRef shardManager = actorFactory
1693 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1694 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1696 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1697 Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class);
1698 assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
1705 public void testRemoveShardReplicaLocal() {
1706 final TestKit kit = new TestKit(getSystem());
1707 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1709 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1710 RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1712 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1714 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1715 shardManager.tell(new ActorInitialized(), respondActor);
1716 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1717 DataStoreVersions.CURRENT_VERSION), kit.getRef());
1719 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1722 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1723 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1724 RemoveServer.class);
1725 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1726 removeServer.getServerId());
1727 kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1731 public void testRemoveShardReplicaRemote() {
1732 MockConfiguration mockConfig = new MockConfiguration(
1733 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1734 .put("astronauts", Arrays.asList("member-1")).build());
1736 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1738 // Create an ActorSystem ShardManager actor for member-1.
1739 final ActorSystem system1 = newActorSystem("Member1");
1740 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1741 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1743 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1744 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1745 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1748 // Create an ActorSystem ShardManager actor for member-2.
1749 final ActorSystem system2 = newActorSystem("Member2");
1750 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1752 String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1753 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1754 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1755 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1756 new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1758 LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1760 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1761 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1762 new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1765 // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1766 // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1767 // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1768 // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1770 // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1771 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1772 // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1774 // To work around this problem we create a ForwardingActor with the right address and pass to it the
1775 // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1776 // thing works as expected
1777 final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1778 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1779 "member-2-shard-default-" + shardMrgIDSuffix);
1781 LOG.error("Forwarding actor : {}", actorRef);
1783 final TestKit kit = new TestKit(getSystem());
1784 newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1785 leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1787 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1788 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1790 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1791 leaderShardManager.tell(
1792 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1793 mockShardLeaderActor);
1794 leaderShardManager.tell(
1795 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1796 mockShardLeaderActor);
1798 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1799 newReplicaShardManager.tell(
1800 new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1802 newReplicaShardManager.tell(
1803 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1806 newReplicaShardManager.underlyingActor().waitForMemberUp();
1807 leaderShardManager.underlyingActor().waitForMemberUp();
1809 // construct a mock response message
1810 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1811 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1812 RemoveServer.class);
1813 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1814 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1815 kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
1819 public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1820 testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1821 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1825 public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1826 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1827 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1831 public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1832 final Class<?> firstForwardedServerChangeClass,
1833 final Object secondServerChange) {
1834 final TestKit kit = new TestKit(getSystem());
1835 final TestKit mockShardLeaderKit = new TestKit(getSystem());
1836 final TestKit secondRequestKit = new TestKit(getSystem());
1838 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1839 .put(shardName, Arrays.asList("member-2")).build());
1841 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1842 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1843 .cluster(new MockClusterWrapper()).props()
1844 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1847 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1849 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1851 shardManager.tell(firstServerChange, kit.getRef());
1853 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1855 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1857 secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class);
1861 public void testServerRemovedShardActorNotRunning() {
1862 LOG.info("testServerRemovedShardActorNotRunning starting");
1863 final TestKit kit = new TestKit(getSystem());
1864 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1865 .put("default", Arrays.asList("member-1", "member-2"))
1866 .put("astronauts", Arrays.asList("member-2"))
1867 .put("people", Arrays.asList("member-1", "member-2")).build());
1869 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1870 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1872 shardManager.underlyingActor().waitForRecoveryComplete();
1873 shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1874 kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1876 shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1877 kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1879 // Removed the default shard replica from member-1
1880 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1881 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1883 shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1885 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1887 LOG.info("testServerRemovedShardActorNotRunning ending");
1891 public void testServerRemovedShardActorRunning() {
1892 LOG.info("testServerRemovedShardActorRunning starting");
1893 final TestKit kit = new TestKit(getSystem());
1894 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1895 .put("default", Arrays.asList("member-1", "member-2"))
1896 .put("astronauts", Arrays.asList("member-2"))
1897 .put("people", Arrays.asList("member-1", "member-2")).build());
1899 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1900 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1902 TestActorRef<TestShardManager> shardManager = actorFactory
1903 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1904 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1906 shardManager.underlyingActor().waitForRecoveryComplete();
1908 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1909 shardManager.tell(new ActorInitialized(), shard);
1911 waitForShardInitialized(shardManager, "people", kit);
1912 waitForShardInitialized(shardManager, "default", kit);
1914 // Removed the default shard replica from member-1
1915 shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1917 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1919 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1921 LOG.info("testServerRemovedShardActorRunning ending");
1925 public void testShardPersistenceWithRestoredData() {
1926 LOG.info("testShardPersistenceWithRestoredData starting");
1927 final TestKit kit = new TestKit(getSystem());
1928 MockConfiguration mockConfig =
1929 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1930 .put("default", Arrays.asList("member-1", "member-2"))
1931 .put("astronauts", Arrays.asList("member-2"))
1932 .put("people", Arrays.asList("member-1", "member-2")).build());
1933 String[] restoredShards = {"default", "astronauts"};
1934 ShardManagerSnapshot snapshot =
1935 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1936 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1938 // create shardManager to come up with restored data
1939 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1940 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1942 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1944 newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1945 LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
1946 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1948 // Verify a local shard is created for the restored shards,
1949 // although we expect a NotInitializedException for the shards
1950 // as the actor initialization
1951 // message is not sent for them
1952 newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1953 kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1955 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1956 kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1958 LOG.info("testShardPersistenceWithRestoredData ending");
1962 public void testShutDown() throws Exception {
1963 LOG.info("testShutDown starting");
1964 final TestKit kit = new TestKit(getSystem());
1965 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1966 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1968 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1969 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1971 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1972 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1974 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1975 .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1977 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1978 shardManager.tell(new ActorInitialized(), shard1);
1979 shardManager.tell(new ActorInitialized(), shard2);
1981 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1982 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1984 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1985 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1988 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1989 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1990 } catch (TimeoutException e) {
1994 actorFactory.killActor(shard1, kit);
1995 actorFactory.killActor(shard2, kit);
1997 Boolean stopped = Await.result(stopFuture, duration);
1998 assertEquals("Stopped", Boolean.TRUE, stopped);
2000 LOG.info("testShutDown ending");
2004 public void testChangeServersVotingStatus() {
2005 final TestKit kit = new TestKit(getSystem());
2006 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2008 ActorRef respondActor = actorFactory
2009 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2010 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2012 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2014 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2015 shardManager.tell(new ActorInitialized(), respondActor);
2016 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2017 DataStoreVersions.CURRENT_VERSION), kit.getRef());
2019 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2023 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2025 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2026 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2027 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2028 ImmutableMap.of(ShardIdentifier
2029 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2032 kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
2036 public void testChangeServersVotingStatusWithNoLeader() {
2037 final TestKit kit = new TestKit(getSystem());
2038 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2040 ActorRef respondActor = actorFactory
2041 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2042 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2044 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2046 shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2047 shardManager.tell(new ActorInitialized(), respondActor);
2048 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2051 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2053 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2055 Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
2056 assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
2059 public static class TestShardManager extends ShardManager {
2060 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2061 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2062 private ShardManagerSnapshot snapshot;
2063 private final Map<String, ActorRef> shardActors;
2064 private final ActorRef shardActor;
2065 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2066 private CountDownLatch memberUpReceived = new CountDownLatch(1);
2067 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2068 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2069 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2070 private volatile MessageInterceptor messageInterceptor;
2072 TestShardManager(final Builder builder) {
2074 shardActor = builder.shardActor;
2075 shardActors = builder.shardActors;
2079 protected void handleRecover(final Object message) throws Exception {
2081 super.handleRecover(message);
2083 if (message instanceof RecoveryCompleted) {
2084 recoveryComplete.countDown();
2089 private void countDownIfOther(final Member member, final CountDownLatch latch) {
2090 if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2096 public void handleCommand(final Object message) throws Exception {
2098 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2099 getSender().tell(messageInterceptor.apply(message), getSelf());
2101 super.handleCommand(message);
2104 if (message instanceof FindPrimary) {
2105 findPrimaryMessageReceived.countDown();
2106 } else if (message instanceof ClusterEvent.MemberUp) {
2107 countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2108 } else if (message instanceof ClusterEvent.MemberRemoved) {
2109 countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2110 } else if (message instanceof ClusterEvent.UnreachableMember) {
2111 countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2112 } else if (message instanceof ClusterEvent.ReachableMember) {
2113 countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2118 void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2119 this.messageInterceptor = messageInterceptor;
2122 void waitForRecoveryComplete() {
2123 assertTrue("Recovery complete",
2124 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2127 public void waitForMemberUp() {
2128 assertTrue("MemberUp received",
2129 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2130 memberUpReceived = new CountDownLatch(1);
2133 void waitForMemberRemoved() {
2134 assertTrue("MemberRemoved received",
2135 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2136 memberRemovedReceived = new CountDownLatch(1);
2139 void waitForUnreachableMember() {
2140 assertTrue("UnreachableMember received",
2141 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
2142 memberUnreachableReceived = new CountDownLatch(1);
2145 void waitForReachableMember() {
2146 assertTrue("ReachableMember received",
2147 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2148 memberReachableReceived = new CountDownLatch(1);
2151 void verifyFindPrimary() {
2152 assertTrue("FindPrimary received",
2153 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2154 findPrimaryMessageReceived = new CountDownLatch(1);
2157 public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2158 return new Builder(datastoreContextBuilder);
2161 public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2162 private ActorRef shardActor;
2163 private final Map<String, ActorRef> shardActors = new HashMap<>();
2165 Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2166 super(TestShardManager.class);
2167 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2170 Builder shardActor(final ActorRef newShardActor) {
2171 this.shardActor = newShardActor;
2175 Builder addShardActor(final String shardName, final ActorRef actorRef) {
2176 shardActors.put(shardName, actorRef);
2182 public void saveSnapshot(final Object obj) {
2183 snapshot = (ShardManagerSnapshot) obj;
2184 snapshotPersist.countDown();
2185 super.saveSnapshot(obj);
2188 void verifySnapshotPersisted(final Set<String> shardList) {
2189 assertTrue("saveSnapshot invoked",
2190 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2191 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2195 protected ActorRef newShardActor(final ShardInformation info) {
2196 if (shardActors.get(info.getShardName()) != null) {
2197 return shardActors.get(info.getShardName());
2200 if (shardActor != null) {
2204 return super.newShardActor(info);
2208 private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2209 extends AbstractShardManagerCreator<T> {
2210 private final Class<C> shardManagerClass;
2212 AbstractGenericCreator(final Class<C> shardManagerClass) {
2213 this.shardManagerClass = shardManagerClass;
2214 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2215 .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2219 public Props props() {
2221 return Props.create(shardManagerClass, this);
2225 private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2226 GenericCreator(final Class<C> shardManagerClass) {
2227 super(shardManagerClass);
2231 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2232 private static final long serialVersionUID = 1L;
2233 private final Creator<ShardManager> delegate;
2235 DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2236 this.delegate = delegate;
2240 public ShardManager create() throws Exception {
2241 return delegate.create();
2245 interface MessageInterceptor extends Function<Object, Object> {
2246 boolean canIntercept(Object message);
2249 private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2250 return new MessageInterceptor() {
2252 public Object apply(final Object message) {
2253 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2257 public boolean canIntercept(final Object message) {
2258 return message instanceof FindPrimary;
2263 private static class MockRespondActor extends MessageCollectorActor {
2264 static final String CLEAR_RESPONSE = "clear-response";
2266 private Object responseMsg;
2267 private final Class<?> requestClass;
2269 @SuppressWarnings("unused")
2270 MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2271 this.requestClass = requestClass;
2272 this.responseMsg = responseMsg;
2276 public void onReceive(final Object message) throws Exception {
2277 if (message.equals(CLEAR_RESPONSE)) {
2280 super.onReceive(message);
2281 if (message.getClass().equals(requestClass) && responseMsg != null) {
2282 getSender().tell(responseMsg, getSelf());