2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.actor.AddressFromURIString;
25 import akka.actor.Props;
26 import akka.actor.Status;
27 import akka.actor.Status.Failure;
28 import akka.actor.Status.Success;
29 import akka.cluster.Cluster;
30 import akka.cluster.ClusterEvent;
31 import akka.cluster.Member;
32 import akka.dispatch.Dispatchers;
33 import akka.japi.Creator;
34 import akka.pattern.Patterns;
35 import akka.persistence.RecoveryCompleted;
36 import akka.serialization.Serialization;
37 import akka.testkit.JavaTestKit;
38 import akka.testkit.TestActorRef;
39 import akka.util.Timeout;
40 import com.google.common.base.Function;
41 import com.google.common.base.Stopwatch;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
47 import java.util.AbstractMap;
48 import java.util.Arrays;
49 import java.util.Collection;
50 import java.util.Collections;
51 import java.util.HashMap;
52 import java.util.List;
54 import java.util.Map.Entry;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import java.util.stream.Collectors;
60 import org.junit.Test;
61 import org.mockito.Mockito;
62 import org.opendaylight.controller.cluster.access.concepts.MemberName;
63 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
64 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
65 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
66 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
67 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
68 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
69 import org.opendaylight.controller.cluster.datastore.Shard;
70 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
71 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
72 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
73 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
74 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
75 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
76 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
77 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
78 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
79 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
80 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
81 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
82 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
83 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
84 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
85 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
86 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
87 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
88 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
89 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
90 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
91 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
93 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
94 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
95 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
96 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
97 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
98 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
99 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
100 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
101 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
102 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
103 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
104 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
105 import org.opendaylight.controller.cluster.raft.RaftState;
106 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
107 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
108 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
109 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
110 import org.opendaylight.controller.cluster.raft.messages.AddServer;
111 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
112 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
113 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
114 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
115 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
116 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
117 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
118 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
119 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
120 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
121 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
123 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
124 import org.slf4j.Logger;
125 import org.slf4j.LoggerFactory;
126 import scala.concurrent.Await;
127 import scala.concurrent.Future;
128 import scala.concurrent.duration.FiniteDuration;
130 public class ShardManagerTest extends AbstractShardManagerTest {
131 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
132 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
133 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
135 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
137 private ActorSystem newActorSystem(String config) {
138 return newActorSystem("cluster-test", config);
141 private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
142 String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
143 if (system == getSystem()) {
144 return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
147 return system.actorOf(Props.create(MessageCollectorActor.class), name);
150 private Props newShardMgrProps() {
151 return newShardMgrProps(new MockConfiguration());
154 private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
155 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
156 Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
157 Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
161 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
162 return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
165 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
166 return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
167 .distributedDataStore(mock(DistributedDataStore.class));
171 private Props newPropsShardMgrWithMockShardActor() {
172 return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
173 Dispatchers.DefaultDispatcherId());
176 private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
177 return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
178 .withDispatcher(Dispatchers.DefaultDispatcherId());
182 private TestShardManager newTestShardManager() {
183 return newTestShardManager(newShardMgrProps());
186 private TestShardManager newTestShardManager(Props props) {
187 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
188 TestShardManager shardManager = shardManagerActor.underlyingActor();
189 shardManager.waitForRecoveryComplete();
193 private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
194 AssertionError last = null;
195 Stopwatch sw = Stopwatch.createStarted();
196 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
198 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
199 kit.expectMsgClass(LocalShardFound.class);
201 } catch (AssertionError e) {
205 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
211 @SuppressWarnings("unchecked")
212 private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
213 Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
214 if (reply instanceof Failure) {
215 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
222 public void testPerShardDatastoreContext() throws Exception {
223 LOG.info("testPerShardDatastoreContext starting");
224 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
225 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
228 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
229 .when(mockFactory).getShardDatastoreContext("default");
232 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
233 .when(mockFactory).getShardDatastoreContext("topology");
235 final MockConfiguration mockConfig = new MockConfiguration() {
237 public Collection<String> getMemberShardNames(MemberName memberName) {
238 return Arrays.asList("default", "topology");
242 public Collection<MemberName> getMembersFromShardName(String shardName) {
243 return members("member-1");
247 final ActorRef defaultShardActor = actorFactory.createActor(
248 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
249 final ActorRef topologyShardActor = actorFactory.createActor(
250 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
252 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
253 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
254 shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
255 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
257 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
258 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
259 class LocalShardManager extends ShardManager {
260 LocalShardManager(AbstractShardManagerCreator<?> creator) {
265 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
266 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
269 ref = entry.getKey();
270 entry.setValue(info.getDatastoreContext());
273 newShardActorLatch.countDown();
278 final Creator<ShardManager> creator = new Creator<ShardManager>() {
279 private static final long serialVersionUID = 1L;
281 public ShardManager create() throws Exception {
282 return new LocalShardManager(
283 new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
284 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
288 JavaTestKit kit = new JavaTestKit(getSystem());
290 final ActorRef shardManager = actorFactory.createActor(Props.create(
291 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
293 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
295 assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
296 assertEquals("getShardElectionTimeoutFactor", 6,
297 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
298 assertEquals("getShardElectionTimeoutFactor", 7,
299 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
301 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
302 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
304 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
305 .when(newMockFactory).getShardDatastoreContext("default");
308 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
309 .when(newMockFactory).getShardDatastoreContext("topology");
311 shardManager.tell(newMockFactory, kit.getRef());
313 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
314 DatastoreContext.class);
315 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
317 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
318 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
320 LOG.info("testPerShardDatastoreContext ending");
324 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
325 new JavaTestKit(getSystem()) {
327 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
329 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
331 shardManager.tell(new FindPrimary("non-existent", false), getRef());
333 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
339 public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
340 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
341 new JavaTestKit(getSystem()) {
343 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
345 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
347 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
348 shardManager.tell(new ActorInitialized(), mockShardActor);
350 DataTree mockDataTree = mock(DataTree.class);
351 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
352 DataStoreVersions.CURRENT_VERSION), getRef());
354 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
356 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
359 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
361 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
362 LocalPrimaryShardFound.class);
363 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
364 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
365 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
369 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
373 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
374 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
375 new JavaTestKit(getSystem()) {
377 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
379 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
380 shardManager.tell(new ActorInitialized(), mockShardActor);
382 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
383 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
385 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
387 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
390 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
392 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
396 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
400 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
401 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
402 new JavaTestKit(getSystem()) {
404 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
406 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
407 shardManager.tell(new ActorInitialized(), mockShardActor);
409 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
410 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
412 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
414 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
416 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
417 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
419 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
421 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
422 RemotePrimaryShardFound.class);
423 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
424 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
425 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
429 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
433 public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
434 new JavaTestKit(getSystem()) {
436 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
438 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
440 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
446 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
447 new JavaTestKit(getSystem()) {
449 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
451 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
452 shardManager.tell(new ActorInitialized(), mockShardActor);
454 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
456 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
462 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
463 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
464 new JavaTestKit(getSystem()) {
466 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
468 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
469 shardManager.tell(new ActorInitialized(), mockShardActor);
471 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
473 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
476 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
478 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
480 DataTree mockDataTree = mock(DataTree.class);
481 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
482 DataStoreVersions.CURRENT_VERSION), mockShardActor);
484 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
486 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
487 LocalPrimaryShardFound.class);
488 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
489 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
490 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
494 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
498 public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
499 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
500 datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
501 new JavaTestKit(getSystem()) {
503 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
505 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
507 // We're passing waitUntilInitialized = true to FindPrimary so
508 // the response should be
509 // delayed until we send ActorInitialized and
510 // RoleChangeNotification.
511 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
513 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
515 shardManager.tell(new ActorInitialized(), mockShardActor);
517 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
519 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
521 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
524 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
526 DataTree mockDataTree = mock(DataTree.class);
527 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
528 DataStoreVersions.CURRENT_VERSION), mockShardActor);
530 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"),
531 LocalPrimaryShardFound.class);
532 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
533 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
534 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
536 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
540 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
544 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
545 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
546 new JavaTestKit(getSystem()) {
548 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
550 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
552 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
554 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
556 shardManager.tell(new ActorInitialized(), mockShardActor);
558 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
562 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
566 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
567 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
568 new JavaTestKit(getSystem()) {
570 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
572 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
573 shardManager.tell(new ActorInitialized(), mockShardActor);
574 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
575 RaftState.Candidate.name()), mockShardActor);
577 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
579 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
583 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
587 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
588 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
589 new JavaTestKit(getSystem()) {
591 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
593 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
594 shardManager.tell(new ActorInitialized(), mockShardActor);
595 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
596 RaftState.IsolatedLeader.name()), mockShardActor);
598 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
600 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
604 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
608 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
609 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
610 new JavaTestKit(getSystem()) {
612 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
614 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
615 shardManager.tell(new ActorInitialized(), mockShardActor);
617 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
619 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
623 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
627 public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
628 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
629 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
631 // Create an ActorSystem ShardManager actor for member-1.
633 final ActorSystem system1 = newActorSystem("Member1");
634 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
636 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
637 newTestShardMgrBuilderWithMockShardActor().cluster(
638 new ClusterWrapperImpl(system1)).props().withDispatcher(
639 Dispatchers.DefaultDispatcherId()), shardManagerID);
641 // Create an ActorSystem ShardManager actor for member-2.
643 final ActorSystem system2 = newActorSystem("Member2");
645 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
647 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
649 MockConfiguration mockConfig2 = new MockConfiguration(
650 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
651 .put("astronauts", Arrays.asList("member-2")).build());
653 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
654 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
655 new ClusterWrapperImpl(system2)).props().withDispatcher(
656 Dispatchers.DefaultDispatcherId()), shardManagerID);
658 new JavaTestKit(system1) {
660 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
661 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
663 shardManager2.tell(new ActorInitialized(), mockShardActor2);
665 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
666 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
668 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
671 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
674 shardManager1.underlyingActor().waitForMemberUp();
675 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
677 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
678 String path = found.getPrimaryPath();
679 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
680 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
682 shardManager2.underlyingActor().verifyFindPrimary();
684 Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
686 shardManager1.underlyingActor().waitForMemberRemoved();
688 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
690 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
694 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
698 public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
699 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
700 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
702 // Create an ActorSystem ShardManager actor for member-1.
704 final ActorSystem system1 = newActorSystem("Member1");
705 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
707 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
709 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
710 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
711 new ClusterWrapperImpl(system1)).props().withDispatcher(
712 Dispatchers.DefaultDispatcherId()), shardManagerID);
714 // Create an ActorSystem ShardManager actor for member-2.
716 final ActorSystem system2 = newActorSystem("Member2");
718 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
720 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
722 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
723 .put("default", Arrays.asList("member-1", "member-2")).build());
725 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
726 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
727 new ClusterWrapperImpl(system2)).props().withDispatcher(
728 Dispatchers.DefaultDispatcherId()), shardManagerID);
730 new JavaTestKit(system1) {
732 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
733 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
734 shardManager1.tell(new ActorInitialized(), mockShardActor1);
735 shardManager2.tell(new ActorInitialized(), mockShardActor2);
737 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
738 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
739 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
740 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
742 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
744 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
745 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
747 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
749 shardManager1.underlyingActor().waitForMemberUp();
751 shardManager1.tell(new FindPrimary("default", true), getRef());
753 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
754 String path = found.getPrimaryPath();
755 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
757 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
758 "akka://cluster-test@127.0.0.1:2558"), getRef());
760 shardManager1.underlyingActor().waitForUnreachableMember();
762 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
763 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
764 MessageCollectorActor.clearMessages(mockShardActor1);
767 MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
770 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
772 shardManager1.tell(new FindPrimary("default", true), getRef());
774 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
777 MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
780 shardManager1.underlyingActor().waitForReachableMember();
782 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
783 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
784 MessageCollectorActor.clearMessages(mockShardActor1);
786 shardManager1.tell(new FindPrimary("default", true), getRef());
788 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
789 String path1 = found1.getPrimaryPath();
790 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
793 MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
796 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
798 // Test FindPrimary wait succeeds after reachable member event.
800 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
801 "akka://cluster-test@127.0.0.1:2558"), getRef());
802 shardManager1.underlyingActor().waitForUnreachableMember();
804 shardManager1.tell(new FindPrimary("default", true), getRef());
807 MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
810 RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
811 String path2 = found2.getPrimaryPath();
812 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
816 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
820 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
821 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
822 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
824 // Create an ActorSystem ShardManager actor for member-1.
826 final ActorSystem system1 = newActorSystem("Member1");
827 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
829 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
831 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
832 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
833 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
834 .primaryShardInfoCache(primaryShardInfoCache).props()
835 .withDispatcher(Dispatchers.DefaultDispatcherId()),
838 // Create an ActorSystem ShardManager actor for member-2.
840 final ActorSystem system2 = newActorSystem("Member2");
842 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
844 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
846 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
847 .put("default", Arrays.asList("member-1", "member-2")).build());
849 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
850 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
851 new ClusterWrapperImpl(system2)).props().withDispatcher(
852 Dispatchers.DefaultDispatcherId()), shardManagerID);
854 new JavaTestKit(system1) {
856 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
857 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
858 shardManager1.tell(new ActorInitialized(), mockShardActor1);
859 shardManager2.tell(new ActorInitialized(), mockShardActor2);
861 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
862 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
863 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
864 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
866 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
868 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
869 DataStoreVersions.CURRENT_VERSION), mockShardActor2);
871 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
873 shardManager1.underlyingActor().waitForMemberUp();
875 shardManager1.tell(new FindPrimary("default", true), getRef());
877 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
878 String path = found.getPrimaryPath();
879 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
881 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
882 system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
884 shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
885 "akka://cluster-test@127.0.0.1:2558"), getRef());
887 shardManager1.underlyingActor().waitForUnreachableMember();
889 shardManager1.tell(new FindPrimary("default", true), getRef());
891 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
893 assertNull("Expected primaryShardInfoCache entry removed",
894 primaryShardInfoCache.getIfPresent("default"));
896 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
897 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
899 new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
902 shardManager1.tell(new FindPrimary("default", true), getRef());
904 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
905 String path1 = found1.getPrimaryPath();
906 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
911 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
916 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
917 new JavaTestKit(getSystem()) {
919 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
921 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
923 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
925 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
927 assertEquals("getShardName", "non-existent", notFound.getShardName());
933 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
934 new JavaTestKit(getSystem()) {
936 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
938 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
939 shardManager.tell(new ActorInitialized(), mockShardActor);
941 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
943 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
945 assertTrue("Found path contains " + found.getPath().path().toString(),
946 found.getPath().path().toString().contains("member-1-shard-default-config"));
952 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
953 new JavaTestKit(getSystem()) {
955 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
957 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
959 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
965 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
966 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
967 new JavaTestKit(getSystem()) {
969 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
971 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
973 // We're passing waitUntilInitialized = true to FindLocalShard
974 // so the response should be
975 // delayed until we send ActorInitialized.
976 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
977 new Timeout(5, TimeUnit.SECONDS));
979 shardManager.tell(new ActorInitialized(), mockShardActor);
981 Object resp = Await.result(future, duration("5 seconds"));
982 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
986 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
990 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
991 TestShardManager shardManager = newTestShardManager();
993 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
994 shardManager.onReceiveCommand(new RoleChangeNotification(
995 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
997 verify(ready, never()).countDown();
999 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1000 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1002 verify(ready, times(1)).countDown();
1006 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1007 new JavaTestKit(getSystem()) {
1009 TestShardManager shardManager = newTestShardManager();
1011 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1012 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1014 verify(ready, never()).countDown();
1017 .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1019 shardManager.onReceiveCommand(
1020 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1021 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1023 verify(ready, times(1)).countDown();
1029 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1030 new JavaTestKit(getSystem()) {
1032 TestShardManager shardManager = newTestShardManager();
1034 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1035 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1037 verify(ready, never()).countDown();
1039 shardManager.onReceiveCommand(
1040 new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1041 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1044 .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1046 verify(ready, times(1)).countDown();
1052 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1053 TestShardManager shardManager = newTestShardManager();
1055 shardManager.onReceiveCommand(new RoleChangeNotification(
1056 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1058 verify(ready, never()).countDown();
1062 public void testByDefaultSyncStatusIsFalse() throws Exception {
1063 TestShardManager shardManager = newTestShardManager();
1065 assertEquals(false, shardManager.getMBean().getSyncStatus());
1069 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1070 TestShardManager shardManager = newTestShardManager();
1072 shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1073 RaftState.Follower.name(), RaftState.Leader.name()));
1075 assertEquals(true, shardManager.getMBean().getSyncStatus());
1079 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1080 TestShardManager shardManager = newTestShardManager();
1082 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1083 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1084 RaftState.Follower.name(), RaftState.Candidate.name()));
1086 assertEquals(false, shardManager.getMBean().getSyncStatus());
1088 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1089 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1092 assertEquals(false, shardManager.getMBean().getSyncStatus());
1096 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1097 TestShardManager shardManager = newTestShardManager();
1099 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1100 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1101 RaftState.Candidate.name(), RaftState.Follower.name()));
1103 // Initially will be false
1104 assertEquals(false, shardManager.getMBean().getSyncStatus());
1106 // Send status true will make sync status true
1107 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1109 assertEquals(true, shardManager.getMBean().getSyncStatus());
1111 // Send status false will make sync status false
1112 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1114 assertEquals(false, shardManager.getMBean().getSyncStatus());
1119 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1120 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1121 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1123 public List<String> getMemberShardNames(MemberName memberName) {
1124 return Arrays.asList("default", "astronauts");
1128 // Initially will be false
1129 assertEquals(false, shardManager.getMBean().getSyncStatus());
1131 // Make default shard leader
1132 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1133 shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1134 RaftState.Follower.name(), RaftState.Leader.name()));
1136 // default = Leader, astronauts is unknown so sync status remains false
1137 assertEquals(false, shardManager.getMBean().getSyncStatus());
1139 // Make astronauts shard leader as well
1140 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1141 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1142 RaftState.Follower.name(), RaftState.Leader.name()));
1144 // Now sync status should be true
1145 assertEquals(true, shardManager.getMBean().getSyncStatus());
1147 // Make astronauts a Follower
1148 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1149 RaftState.Leader.name(), RaftState.Follower.name()));
1151 // Sync status is not true
1152 assertEquals(false, shardManager.getMBean().getSyncStatus());
1154 // Make the astronauts follower sync status true
1155 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1157 // Sync status is now true
1158 assertEquals(true, shardManager.getMBean().getSyncStatus());
1160 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1164 public void testOnReceiveSwitchShardBehavior() throws Exception {
1165 new JavaTestKit(getSystem()) {
1167 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1169 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1170 shardManager.tell(new ActorInitialized(), mockShardActor);
1172 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1174 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1175 SwitchBehavior.class);
1177 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1178 assertEquals(1000, switchBehavior.getNewTerm());
1183 private static List<MemberName> members(String... names) {
1184 return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1188 public void testOnCreateShard() {
1189 LOG.info("testOnCreateShard starting");
1190 new JavaTestKit(getSystem()) {
1192 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1194 ActorRef shardManager = actorFactory
1195 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1196 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1198 SchemaContext schemaContext = TestModel.createTestContext();
1199 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1201 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1202 .persistent(false).build();
1203 Shard.Builder shardBuilder = Shard.builder();
1205 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1206 "foo", null, members("member-1", "member-5", "member-6"));
1207 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1209 expectMsgClass(duration("5 seconds"), Success.class);
1211 shardManager.tell(new FindLocalShard("foo", true), getRef());
1213 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1215 assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1216 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1217 .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1218 assertEquals("peerMembers", Sets.newHashSet(
1219 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1220 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1221 shardBuilder.getPeerAddresses().keySet());
1222 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1223 shardBuilder.getId());
1224 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1226 // Send CreateShard with same name - should return Success with
1229 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1231 Success success = expectMsgClass(duration("5 seconds"), Success.class);
1232 assertNotNull("Success status is null", success.status());
1236 LOG.info("testOnCreateShard ending");
1240 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1241 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1242 new JavaTestKit(getSystem()) {
1244 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1246 ActorRef shardManager = actorFactory
1247 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1248 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1250 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1252 Shard.Builder shardBuilder = Shard.builder();
1253 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1254 "foo", null, members("member-5", "member-6"));
1256 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1257 expectMsgClass(duration("5 seconds"), Success.class);
1259 shardManager.tell(new FindLocalShard("foo", true), getRef());
1260 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1262 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1263 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1264 .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1268 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1272 public void testOnCreateShardWithNoInitialSchemaContext() {
1273 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1274 new JavaTestKit(getSystem()) {
1276 ActorRef shardManager = actorFactory
1277 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1278 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1280 Shard.Builder shardBuilder = Shard.builder();
1282 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1283 "foo", null, members("member-1"));
1284 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1286 expectMsgClass(duration("5 seconds"), Success.class);
1288 SchemaContext schemaContext = TestModel.createTestContext();
1289 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1291 shardManager.tell(new FindLocalShard("foo", true), getRef());
1293 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1295 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1296 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1300 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1304 public void testGetSnapshot() throws Exception {
1305 LOG.info("testGetSnapshot starting");
1306 JavaTestKit kit = new JavaTestKit(getSystem());
1308 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1309 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1310 .put("astronauts", Collections.<String>emptyList()).build());
1312 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1313 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1315 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1316 Failure failure = kit.expectMsgClass(Failure.class);
1317 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1319 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1321 waitForShardInitialized(shardManager, "shard1", kit);
1322 waitForShardInitialized(shardManager, "shard2", kit);
1324 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1326 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1328 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1329 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1331 Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
1333 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1334 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1336 // Add a new replica
1338 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1340 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1341 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1343 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1344 mockShardLeaderKit.expectMsgClass(AddServer.class);
1345 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1346 kit.expectMsgClass(Status.Success.class);
1347 waitForShardInitialized(shardManager, "astronauts", kit);
1349 // Send another GetSnapshot and verify
1351 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1352 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1354 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1355 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1357 ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1358 assertNotNull("Expected ShardManagerSnapshot", snapshot);
1359 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1360 Sets.newHashSet(snapshot.getShardList()));
1362 LOG.info("testGetSnapshot ending");
1366 public void testRestoreFromSnapshot() throws Exception {
1367 LOG.info("testRestoreFromSnapshot starting");
1369 datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1371 JavaTestKit kit = new JavaTestKit(getSystem());
1373 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1374 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1375 .put("astronauts", Collections.<String>emptyList()).build());
1377 ShardManagerSnapshot snapshot =
1378 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1379 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1380 Collections.<ShardSnapshot>emptyList());
1381 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1382 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1384 shardManager.underlyingActor().waitForRecoveryComplete();
1386 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1388 waitForShardInitialized(shardManager, "shard1", kit);
1389 waitForShardInitialized(shardManager, "shard2", kit);
1390 waitForShardInitialized(shardManager, "astronauts", kit);
1392 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1394 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1396 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1398 assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1399 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1400 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1402 LOG.info("testRestoreFromSnapshot ending");
1406 public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1407 new JavaTestKit(getSystem()) {
1409 ActorRef shardManager = actorFactory
1410 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1411 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1413 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1414 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1416 assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
1422 public void testAddShardReplica() throws Exception {
1423 LOG.info("testAddShardReplica starting");
1424 MockConfiguration mockConfig = new MockConfiguration(
1425 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1426 .put("astronauts", Arrays.asList("member-2")).build());
1428 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1429 datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1431 // Create an ActorSystem ShardManager actor for member-1.
1432 final ActorSystem system1 = newActorSystem("Member1");
1433 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1434 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1435 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1436 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1437 .cluster(new ClusterWrapperImpl(system1)).props()
1438 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1441 // Create an ActorSystem ShardManager actor for member-2.
1442 final ActorSystem system2 = newActorSystem("Member2");
1443 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1445 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1446 String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1447 final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1448 Props.create(MockRespondActor.class, AddServer.class,
1449 new AddServerReply(ServerChangeStatus.OK, memberId2))
1450 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1452 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1453 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1454 .cluster(new ClusterWrapperImpl(system2)).props()
1455 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1458 new JavaTestKit(system1) {
1460 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1461 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1463 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1465 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1466 leaderShardManager.tell(
1467 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1468 mockShardLeaderActor);
1469 leaderShardManager.tell(
1470 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1471 mockShardLeaderActor);
1473 newReplicaShardManager.underlyingActor().waitForMemberUp();
1474 leaderShardManager.underlyingActor().waitForMemberUp();
1476 // Have a dummy snapshot to be overwritten by the new data
1478 String[] restoredShards = { "default", "people" };
1479 ShardManagerSnapshot snapshot =
1480 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1481 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1482 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1484 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1485 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1487 // construct a mock response message
1488 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1489 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1491 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1492 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1493 expectMsgClass(duration("5 seconds"), Status.Success.class);
1495 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1496 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1497 List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1498 ShardManagerSnapshot.class);
1499 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1500 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1501 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1502 Sets.newHashSet(shardManagerSnapshot.getShardList()));
1505 LOG.info("testAddShardReplica ending");
1509 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1510 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1511 new JavaTestKit(getSystem()) {
1513 TestActorRef<TestShardManager> shardManager = actorFactory
1514 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1516 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1517 shardManager.tell(new ActorInitialized(), mockShardActor);
1519 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1520 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1521 ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1522 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1524 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1526 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1528 new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1531 new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1534 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1536 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1538 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1539 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1541 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1542 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1544 // Send message again to verify previous in progress state is
1547 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1548 resp = expectMsgClass(duration("5 seconds"), Failure.class);
1549 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1551 // Send message again with an AddServer timeout to verify the
1552 // pre-existing shard actor isn't terminated.
1555 newDatastoreContextFactory(
1556 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()),
1558 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1559 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1560 expectMsgClass(duration("5 seconds"), Failure.class);
1562 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1563 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1567 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1571 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1572 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1573 new JavaTestKit(getSystem()) {
1575 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1576 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1578 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1579 shardManager.tell(new ActorInitialized(), mockShardActor);
1580 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1581 DataStoreVersions.CURRENT_VERSION), getRef());
1583 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1586 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1587 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1588 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1590 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1591 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1595 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1599 public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1600 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1601 new JavaTestKit(getSystem()) {
1603 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1605 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1606 .put("astronauts", Arrays.asList("member-2")).build());
1608 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1609 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1610 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1611 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1612 shardManager.underlyingActor()
1613 .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1615 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1617 JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1618 terminateWatcher.watch(mockNewReplicaShardActor);
1620 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1622 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1623 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1624 addServerMsg.getNewServerId());
1625 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1627 Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1628 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1630 shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1631 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1633 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1635 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1636 mockShardLeaderKit.expectMsgClass(AddServer.class);
1637 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1638 failure = expectMsgClass(duration("5 seconds"), Failure.class);
1639 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1643 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1647 public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1648 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1649 AddServer.class, new AddShardReplica("astronauts"));
1653 public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1654 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1655 datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1656 new JavaTestKit(getSystem()) {
1658 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1659 .put("astronauts", Arrays.asList("member-2")).build());
1661 final ActorRef newReplicaShardManager = actorFactory
1662 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1663 .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1665 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1666 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1667 AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1669 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1670 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1671 assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
1675 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1679 public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1680 new JavaTestKit(getSystem()) {
1682 ActorRef shardManager = actorFactory
1683 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1684 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1686 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1687 Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1688 assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
1697 public void testRemoveShardReplicaLocal() throws Exception {
1698 new JavaTestKit(getSystem()) {
1700 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1702 final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1703 RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1705 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1707 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1708 shardManager.tell(new ActorInitialized(), respondActor);
1709 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1710 DataStoreVersions.CURRENT_VERSION), getRef());
1712 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1715 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1716 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1717 RemoveServer.class);
1718 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1719 removeServer.getServerId());
1720 expectMsgClass(duration("5 seconds"), Success.class);
1726 public void testRemoveShardReplicaRemote() throws Exception {
1727 MockConfiguration mockConfig = new MockConfiguration(
1728 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1729 .put("astronauts", Arrays.asList("member-1")).build());
1731 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1733 // Create an ActorSystem ShardManager actor for member-1.
1734 final ActorSystem system1 = newActorSystem("Member1");
1735 Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1736 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1738 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1739 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1740 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1743 // Create an ActorSystem ShardManager actor for member-2.
1744 final ActorSystem system2 = newActorSystem("Member2");
1745 Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1747 String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1748 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1749 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1750 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1751 new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1753 LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1755 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1756 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1757 new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1760 // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1761 // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1762 // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1763 // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1765 // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1766 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1767 // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1769 // To work around this problem we create a ForwardingActor with the right address and pass to it the
1770 // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1771 // thing works as expected
1772 final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1773 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1774 "member-2-shard-default-" + shardMrgIDSuffix);
1776 LOG.error("Forwarding actor : {}", actorRef);
1778 new JavaTestKit(system1) {
1780 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1781 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1783 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1784 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1786 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1787 leaderShardManager.tell(
1788 new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1789 mockShardLeaderActor);
1790 leaderShardManager.tell(
1791 new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1792 mockShardLeaderActor);
1794 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1795 newReplicaShardManager.tell(
1796 new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1798 newReplicaShardManager.tell(
1799 new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1802 newReplicaShardManager.underlyingActor().waitForMemberUp();
1803 leaderShardManager.underlyingActor().waitForMemberUp();
1805 // construct a mock response message
1806 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1807 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1808 RemoveServer.class);
1809 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1810 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1811 expectMsgClass(duration("5 seconds"), Status.Success.class);
1817 public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1818 testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1819 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1823 public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1824 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1825 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1829 public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1830 final Class<?> firstForwardedServerChangeClass,
1831 final Object secondServerChange) throws Exception {
1832 new JavaTestKit(getSystem()) {
1834 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1835 final JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1837 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1838 .put(shardName, Arrays.asList("member-2")).build());
1840 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1841 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1842 .cluster(new MockClusterWrapper()).props()
1843 .withDispatcher(Dispatchers.DefaultDispatcherId()),
1846 shardManager.underlyingActor()
1847 .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1849 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1851 shardManager.tell(firstServerChange, getRef());
1853 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1855 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1857 secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1863 public void testServerRemovedShardActorNotRunning() throws Exception {
1864 LOG.info("testServerRemovedShardActorNotRunning starting");
1865 new JavaTestKit(getSystem()) {
1867 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1868 .put("default", Arrays.asList("member-1", "member-2"))
1869 .put("astronauts", Arrays.asList("member-2"))
1870 .put("people", Arrays.asList("member-1", "member-2")).build());
1872 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1873 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1875 shardManager.underlyingActor().waitForRecoveryComplete();
1876 shardManager.tell(new FindLocalShard("people", false), getRef());
1877 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1879 shardManager.tell(new FindLocalShard("default", false), getRef());
1880 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1882 // Removed the default shard replica from member-1
1883 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1884 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1886 shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1888 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1892 LOG.info("testServerRemovedShardActorNotRunning ending");
1896 public void testServerRemovedShardActorRunning() throws Exception {
1897 LOG.info("testServerRemovedShardActorRunning starting");
1898 new JavaTestKit(getSystem()) {
1900 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1901 .put("default", Arrays.asList("member-1", "member-2"))
1902 .put("astronauts", Arrays.asList("member-2"))
1903 .put("people", Arrays.asList("member-1", "member-2")).build());
1905 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1906 ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1908 TestActorRef<TestShardManager> shardManager = actorFactory
1909 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1910 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1912 shardManager.underlyingActor().waitForRecoveryComplete();
1914 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1915 shardManager.tell(new ActorInitialized(), shard);
1917 waitForShardInitialized(shardManager, "people", this);
1918 waitForShardInitialized(shardManager, "default", this);
1920 // Removed the default shard replica from member-1
1921 shardManager.tell(new ServerRemoved(shardId), getRef());
1923 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1925 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1929 LOG.info("testServerRemovedShardActorRunning ending");
1933 public void testShardPersistenceWithRestoredData() throws Exception {
1934 LOG.info("testShardPersistenceWithRestoredData starting");
1935 new JavaTestKit(getSystem()) {
1937 MockConfiguration mockConfig =
1938 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1939 .put("default", Arrays.asList("member-1", "member-2"))
1940 .put("astronauts", Arrays.asList("member-2"))
1941 .put("people", Arrays.asList("member-1", "member-2")).build());
1942 String[] restoredShards = {"default", "astronauts"};
1943 ShardManagerSnapshot snapshot =
1944 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1945 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1947 // create shardManager to come up with restored data
1948 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1949 newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1951 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1953 newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1954 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1955 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1957 // Verify a local shard is created for the restored shards,
1958 // although we expect a NotInitializedException for the shards
1959 // as the actor initialization
1960 // message is not sent for them
1961 newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1962 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1964 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1965 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1969 LOG.info("testShardPersistenceWithRestoredData ending");
1973 public void testShutDown() throws Exception {
1974 LOG.info("testShutDown starting");
1975 new JavaTestKit(getSystem()) {
1977 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1978 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1980 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1981 ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1983 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1984 ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1986 ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1987 .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1989 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1990 shardManager.tell(new ActorInitialized(), shard1);
1991 shardManager.tell(new ActorInitialized(), shard2);
1993 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1994 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1996 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1997 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
2000 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
2001 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
2002 } catch (TimeoutException e) {
2006 actorFactory.killActor(shard1, this);
2007 actorFactory.killActor(shard2, this);
2009 Boolean stopped = Await.result(stopFuture, duration);
2010 assertEquals("Stopped", Boolean.TRUE, stopped);
2014 LOG.info("testShutDown ending");
2018 public void testChangeServersVotingStatus() throws Exception {
2019 new JavaTestKit(getSystem()) {
2021 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2023 ActorRef respondActor = actorFactory
2024 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2025 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2027 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2029 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2030 shardManager.tell(new ActorInitialized(), respondActor);
2031 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2032 DataStoreVersions.CURRENT_VERSION), getRef());
2034 new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2038 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2041 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2042 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2043 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2044 ImmutableMap.of(ShardIdentifier
2045 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2048 expectMsgClass(duration("5 seconds"), Success.class);
2054 public void testChangeServersVotingStatusWithNoLeader() throws Exception {
2055 new JavaTestKit(getSystem()) {
2057 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2059 ActorRef respondActor = actorFactory
2060 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2061 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2063 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2065 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2066 shardManager.tell(new ActorInitialized(), respondActor);
2067 shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2070 new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)),
2073 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2075 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
2076 assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
2081 public static class TestShardManager extends ShardManager {
2082 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2083 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2084 private ShardManagerSnapshot snapshot;
2085 private final Map<String, ActorRef> shardActors;
2086 private final ActorRef shardActor;
2087 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2088 private CountDownLatch memberUpReceived = new CountDownLatch(1);
2089 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2090 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2091 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2092 private volatile MessageInterceptor messageInterceptor;
2094 private TestShardManager(Builder builder) {
2096 shardActor = builder.shardActor;
2097 shardActors = builder.shardActors;
2101 protected void handleRecover(Object message) throws Exception {
2103 super.handleRecover(message);
2105 if (message instanceof RecoveryCompleted) {
2106 recoveryComplete.countDown();
2111 private void countDownIfOther(final Member member, CountDownLatch latch) {
2112 if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2118 public void handleCommand(Object message) throws Exception {
2120 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2121 getSender().tell(messageInterceptor.apply(message), getSelf());
2123 super.handleCommand(message);
2126 if (message instanceof FindPrimary) {
2127 findPrimaryMessageReceived.countDown();
2128 } else if (message instanceof ClusterEvent.MemberUp) {
2129 countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2130 } else if (message instanceof ClusterEvent.MemberRemoved) {
2131 countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2132 } else if (message instanceof ClusterEvent.UnreachableMember) {
2133 countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2134 } else if (message instanceof ClusterEvent.ReachableMember) {
2135 countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2140 void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2141 this.messageInterceptor = messageInterceptor;
2144 void waitForRecoveryComplete() {
2145 assertEquals("Recovery complete", true,
2146 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2149 public void waitForMemberUp() {
2150 assertEquals("MemberUp received", true,
2151 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2152 memberUpReceived = new CountDownLatch(1);
2155 void waitForMemberRemoved() {
2156 assertEquals("MemberRemoved received", true,
2157 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2158 memberRemovedReceived = new CountDownLatch(1);
2161 void waitForUnreachableMember() {
2162 assertEquals("UnreachableMember received", true,
2163 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2165 memberUnreachableReceived = new CountDownLatch(1);
2168 void waitForReachableMember() {
2169 assertEquals("ReachableMember received", true,
2170 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2171 memberReachableReceived = new CountDownLatch(1);
2174 void verifyFindPrimary() {
2175 assertEquals("FindPrimary received", true,
2176 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2177 findPrimaryMessageReceived = new CountDownLatch(1);
2180 public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2181 return new Builder(datastoreContextBuilder);
2184 public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2185 private ActorRef shardActor;
2186 private final Map<String, ActorRef> shardActors = new HashMap<>();
2188 Builder(DatastoreContext.Builder datastoreContextBuilder) {
2189 super(TestShardManager.class);
2190 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2193 Builder shardActor(ActorRef newShardActor) {
2194 this.shardActor = newShardActor;
2198 Builder addShardActor(String shardName, ActorRef actorRef) {
2199 shardActors.put(shardName, actorRef);
2205 public void saveSnapshot(Object obj) {
2206 snapshot = (ShardManagerSnapshot) obj;
2207 snapshotPersist.countDown();
2208 super.saveSnapshot(obj);
2211 void verifySnapshotPersisted(Set<String> shardList) {
2212 assertEquals("saveSnapshot invoked", true,
2213 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2214 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2218 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2219 if (shardActors.get(info.getShardName()) != null) {
2220 return shardActors.get(info.getShardName());
2223 if (shardActor != null) {
2227 return super.newShardActor(schemaContext, info);
2231 private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2232 extends AbstractShardManagerCreator<T> {
2233 private final Class<C> shardManagerClass;
2235 AbstractGenericCreator(Class<C> shardManagerClass) {
2236 this.shardManagerClass = shardManagerClass;
2237 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2238 .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2242 public Props props() {
2244 return Props.create(shardManagerClass, this);
2248 private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2249 GenericCreator(Class<C> shardManagerClass) {
2250 super(shardManagerClass);
2254 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2255 private static final long serialVersionUID = 1L;
2256 private final Creator<ShardManager> delegate;
2258 DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2259 this.delegate = delegate;
2263 public ShardManager create() throws Exception {
2264 return delegate.create();
2268 interface MessageInterceptor extends Function<Object, Object> {
2269 boolean canIntercept(Object message);
2272 private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2273 return new MessageInterceptor() {
2275 public Object apply(Object message) {
2276 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2280 public boolean canIntercept(Object message) {
2281 return message instanceof FindPrimary;
2286 private static class MockRespondActor extends MessageCollectorActor {
2287 static final String CLEAR_RESPONSE = "clear-response";
2289 private Object responseMsg;
2290 private final Class<?> requestClass;
2292 @SuppressWarnings("unused")
2293 MockRespondActor(Class<?> requestClass, Object responseMsg) {
2294 this.requestClass = requestClass;
2295 this.responseMsg = responseMsg;
2299 public void onReceive(Object message) throws Exception {
2300 if (message.equals(CLEAR_RESPONSE)) {
2303 super.onReceive(message);
2304 if (message.getClass().equals(requestClass) && responseMsg != null) {
2305 getSender().tell(responseMsg, getSelf());