2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import akka.actor.AddressFromURIString;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.cluster.Member;
31 import akka.dispatch.Dispatchers;
32 import akka.japi.Creator;
33 import akka.pattern.Patterns;
34 import akka.persistence.RecoveryCompleted;
35 import akka.serialization.Serialization;
36 import akka.testkit.JavaTestKit;
37 import akka.testkit.TestActorRef;
38 import akka.util.Timeout;
39 import com.google.common.base.Function;
40 import com.google.common.base.Stopwatch;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.typesafe.config.ConfigFactory;
48 import java.util.AbstractMap;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
56 import java.util.Map.Entry;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import java.util.stream.Collectors;
62 import org.apache.commons.lang3.SerializationUtils;
63 import org.junit.After;
64 import org.junit.Before;
65 import org.junit.Test;
66 import org.mockito.Mock;
67 import org.mockito.Mockito;
68 import org.mockito.MockitoAnnotations;
69 import org.opendaylight.controller.cluster.access.concepts.MemberName;
70 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
71 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
72 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
73 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
74 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
75 import org.opendaylight.controller.cluster.datastore.Shard;
76 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
77 import org.opendaylight.controller.cluster.datastore.config.Configuration;
78 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
79 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
80 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
81 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
82 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
83 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
84 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
85 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
86 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
87 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
88 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
89 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
90 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
91 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
92 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
93 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
94 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
95 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
96 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
97 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
98 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
99 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
100 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
101 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
102 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
103 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
104 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
105 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
106 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
107 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
108 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
109 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
110 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
111 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
112 import org.opendaylight.controller.cluster.raft.RaftState;
113 import org.opendaylight.controller.cluster.raft.TestActorFactory;
114 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
115 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
116 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
117 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
118 import org.opendaylight.controller.cluster.raft.messages.AddServer;
119 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
120 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
121 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
122 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
123 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
124 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
125 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
126 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
127 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
128 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
129 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
130 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
131 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
132 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
133 import org.slf4j.Logger;
134 import org.slf4j.LoggerFactory;
135 import scala.concurrent.Await;
136 import scala.concurrent.Future;
137 import scala.concurrent.duration.FiniteDuration;
139 public class ShardManagerTest extends AbstractActorTest {
140 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
141 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
142 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
143 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
145 private static int ID_COUNTER = 1;
147 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
148 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
151 private static CountDownLatch ready;
153 private static TestActorRef<MessageCollectorActor> mockShardActor;
155 private static ShardIdentifier mockShardName;
157 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
158 dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
159 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
161 private final Collection<ActorSystem> actorSystems = new ArrayList<>();
163 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
166 public void setUp() {
167 MockitoAnnotations.initMocks(this);
169 InMemoryJournal.clear();
170 InMemorySnapshotStore.clear();
172 if(mockShardActor == null) {
173 mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
174 mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
175 mockShardName.toString());
178 mockShardActor.underlyingActor().clear();
182 public void tearDown() {
183 InMemoryJournal.clear();
184 InMemorySnapshotStore.clear();
186 for(ActorSystem system: actorSystems) {
187 JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
190 actorFactory.close();
193 private ActorSystem newActorSystem(String config) {
194 ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
195 actorSystems.add(system);
199 private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
200 String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
201 if(system == getSystem()) {
202 return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
205 return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
208 private Props newShardMgrProps() {
209 return newShardMgrProps(new MockConfiguration());
212 private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
213 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
214 Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
215 Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
219 private TestShardManager.Builder newTestShardMgrBuilder() {
220 return TestShardManager.builder(datastoreContextBuilder);
223 private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
224 return TestShardManager.builder(datastoreContextBuilder).configuration(config);
227 private Props newShardMgrProps(Configuration config) {
228 return newTestShardMgrBuilder(config).props();
231 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
232 return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
235 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
236 return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
240 private Props newPropsShardMgrWithMockShardActor() {
241 return newTestShardMgrBuilderWithMockShardActor().props();
244 private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
245 return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
249 private TestShardManager newTestShardManager() {
250 return newTestShardManager(newShardMgrProps());
253 private TestShardManager newTestShardManager(Props props) {
254 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
255 TestShardManager shardManager = shardManagerActor.underlyingActor();
256 shardManager.waitForRecoveryComplete();
260 private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
261 AssertionError last = null;
262 Stopwatch sw = Stopwatch.createStarted();
263 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
265 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
266 kit.expectMsgClass(LocalShardFound.class);
268 } catch(AssertionError e) {
272 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
278 private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
279 Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
280 if(reply instanceof Failure) {
281 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
288 public void testPerShardDatastoreContext() throws Exception {
289 LOG.info("testPerShardDatastoreContext starting");
290 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
291 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
293 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
294 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
296 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
297 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
299 final MockConfiguration mockConfig = new MockConfiguration() {
301 public Collection<String> getMemberShardNames(MemberName memberName) {
302 return Arrays.asList("default", "topology");
306 public Collection<MemberName> getMembersFromShardName(String shardName) {
307 return members("member-1");
311 final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
312 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
313 final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
314 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
316 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
317 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
318 shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
319 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
321 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
322 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
323 class LocalShardManager extends ShardManager {
324 public LocalShardManager(AbstractShardManagerCreator<?> creator) {
329 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
330 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
333 ref = entry.getKey();
334 entry.setValue(info.getDatastoreContext());
337 newShardActorLatch.countDown();
342 final Creator<ShardManager> creator = new Creator<ShardManager>() {
343 private static final long serialVersionUID = 1L;
345 public ShardManager create() throws Exception {
346 return new LocalShardManager(new GenericCreator<LocalShardManager>(LocalShardManager.class).
347 datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
348 configuration(mockConfig));
352 JavaTestKit kit = new JavaTestKit(getSystem());
354 final ActorRef shardManager = actorFactory.createActor(Props.create(
355 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
357 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
359 assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
360 assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
361 getShardElectionTimeoutFactor());
362 assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
363 getShardElectionTimeoutFactor());
365 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
366 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
367 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
368 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
370 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
371 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
373 shardManager.tell(newMockFactory, kit.getRef());
375 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
376 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
378 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
379 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
381 LOG.info("testPerShardDatastoreContext ending");
385 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
386 new JavaTestKit(getSystem()) {{
387 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
389 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
391 shardManager.tell(new FindPrimary("non-existent", false), getRef());
393 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
398 public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
399 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
400 new JavaTestKit(getSystem()) {{
401 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
403 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
405 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
406 shardManager.tell(new ActorInitialized(), mockShardActor);
408 DataTree mockDataTree = mock(DataTree.class);
409 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
410 DataStoreVersions.CURRENT_VERSION), getRef());
412 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
413 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
414 RaftState.Leader.name())), mockShardActor);
416 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
418 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
419 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
420 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
421 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
424 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
428 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
429 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
430 new JavaTestKit(getSystem()) {{
431 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
433 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
434 shardManager.tell(new ActorInitialized(), mockShardActor);
436 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
437 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
438 shardManager.tell(new RoleChangeNotification(memberId1,
439 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
440 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
442 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
444 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
447 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
451 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
452 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
453 new JavaTestKit(getSystem()) {{
454 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
456 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
457 shardManager.tell(new ActorInitialized(), mockShardActor);
459 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
460 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
462 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
463 shardManager.tell(new RoleChangeNotification(memberId1,
464 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
465 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
466 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
468 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
470 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
471 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
472 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
473 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
476 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
480 public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
481 new JavaTestKit(getSystem()) {{
482 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
484 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
486 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
491 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
492 new JavaTestKit(getSystem()) {{
493 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
495 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
496 shardManager.tell(new ActorInitialized(), mockShardActor);
498 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
500 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
505 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
506 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
507 new JavaTestKit(getSystem()) {{
508 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
510 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
511 shardManager.tell(new ActorInitialized(), mockShardActor);
513 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
514 shardManager.tell(new RoleChangeNotification(memberId,
515 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
517 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
519 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
521 DataTree mockDataTree = mock(DataTree.class);
522 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
523 DataStoreVersions.CURRENT_VERSION), mockShardActor);
525 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
527 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
528 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
529 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
530 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
533 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
537 public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
538 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
539 datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
540 new JavaTestKit(getSystem()) {{
541 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
543 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
545 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
546 // delayed until we send ActorInitialized and RoleChangeNotification.
547 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
549 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
551 shardManager.tell(new ActorInitialized(), mockShardActor);
553 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
555 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
556 shardManager.tell(new RoleChangeNotification(memberId,
557 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
559 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
561 DataTree mockDataTree = mock(DataTree.class);
562 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
563 DataStoreVersions.CURRENT_VERSION), mockShardActor);
565 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
566 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
567 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
568 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
570 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
573 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
577 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
578 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
579 new JavaTestKit(getSystem()) {{
580 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
582 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
584 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
586 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
588 shardManager.tell(new ActorInitialized(), mockShardActor);
590 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
593 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
597 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
598 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
599 new JavaTestKit(getSystem()) {{
600 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
602 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
603 shardManager.tell(new ActorInitialized(), mockShardActor);
604 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
605 null, RaftState.Candidate.name()), mockShardActor);
607 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
609 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
612 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
616 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
617 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
618 new JavaTestKit(getSystem()) {{
619 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
621 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
622 shardManager.tell(new ActorInitialized(), mockShardActor);
623 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
624 null, RaftState.IsolatedLeader.name()), mockShardActor);
626 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
628 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
631 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
635 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
636 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
637 new JavaTestKit(getSystem()) {{
638 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
640 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
641 shardManager.tell(new ActorInitialized(), mockShardActor);
643 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
645 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
648 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
652 public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
653 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
654 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
656 // Create an ActorSystem ShardManager actor for member-1.
658 final ActorSystem system1 = newActorSystem("Member1");
659 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
661 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
662 newTestShardMgrBuilderWithMockShardActor().cluster(
663 new ClusterWrapperImpl(system1)).props().withDispatcher(
664 Dispatchers.DefaultDispatcherId()), shardManagerID);
666 // Create an ActorSystem ShardManager actor for member-2.
668 final ActorSystem system2 = newActorSystem("Member2");
670 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
672 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
674 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
675 put("default", Arrays.asList("member-1", "member-2")).
676 put("astronauts", Arrays.asList("member-2")).build());
678 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
679 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
680 new ClusterWrapperImpl(system2)).props().withDispatcher(
681 Dispatchers.DefaultDispatcherId()), shardManagerID);
683 new JavaTestKit(system1) {{
685 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
686 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
688 shardManager2.tell(new ActorInitialized(), mockShardActor2);
690 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
691 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
692 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
693 mock(DataTree.class), leaderVersion), mockShardActor2);
694 shardManager2.tell(new RoleChangeNotification(memberId2,
695 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
697 shardManager1.underlyingActor().waitForMemberUp();
698 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
700 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
701 String path = found.getPrimaryPath();
702 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
703 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
705 shardManager2.underlyingActor().verifyFindPrimary();
707 Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
709 shardManager1.underlyingActor().waitForMemberRemoved();
711 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
713 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
716 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
720 public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
721 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
722 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
724 // Create an ActorSystem ShardManager actor for member-1.
726 final ActorSystem system1 = newActorSystem("Member1");
727 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
729 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
731 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
732 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
733 new ClusterWrapperImpl(system1)).props().withDispatcher(
734 Dispatchers.DefaultDispatcherId()), shardManagerID);
736 // Create an ActorSystem ShardManager actor for member-2.
738 final ActorSystem system2 = newActorSystem("Member2");
740 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
742 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
744 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
745 put("default", Arrays.asList("member-1", "member-2")).build());
747 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
748 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
749 new ClusterWrapperImpl(system2)).props().withDispatcher(
750 Dispatchers.DefaultDispatcherId()), shardManagerID);
752 new JavaTestKit(system1) {{
754 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
755 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
756 shardManager1.tell(new ActorInitialized(), mockShardActor1);
757 shardManager2.tell(new ActorInitialized(), mockShardActor2);
759 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
760 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
761 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
762 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
763 shardManager1.tell(new RoleChangeNotification(memberId1,
764 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
765 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
766 DataStoreVersions.CURRENT_VERSION),
768 shardManager2.tell(new RoleChangeNotification(memberId2,
769 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
770 shardManager1.underlyingActor().waitForMemberUp();
772 shardManager1.tell(new FindPrimary("default", true), getRef());
774 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
775 String path = found.getPrimaryPath();
776 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
778 shardManager1.tell(MockClusterWrapper.
779 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
781 shardManager1.underlyingActor().waitForUnreachableMember();
783 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
784 assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
785 MessageCollectorActor.clearMessages(mockShardActor1);
787 shardManager1.tell(MockClusterWrapper.
788 createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
790 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
792 shardManager1.tell(new FindPrimary("default", true), getRef());
794 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
796 shardManager1.tell(MockClusterWrapper.
797 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
799 shardManager1.underlyingActor().waitForReachableMember();
801 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
802 assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
803 MessageCollectorActor.clearMessages(mockShardActor1);
805 shardManager1.tell(new FindPrimary("default", true), getRef());
807 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
808 String path1 = found1.getPrimaryPath();
809 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
811 shardManager1.tell(MockClusterWrapper.
812 createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
814 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
816 // Test FindPrimary wait succeeds after reachable member event.
818 shardManager1.tell(MockClusterWrapper.
819 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
820 shardManager1.underlyingActor().waitForUnreachableMember();
822 shardManager1.tell(new FindPrimary("default", true), getRef());
824 shardManager1.tell(MockClusterWrapper.
825 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
827 RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
828 String path2 = found2.getPrimaryPath();
829 assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
832 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
836 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
837 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
838 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
840 // Create an ActorSystem ShardManager actor for member-1.
842 final ActorSystem system1 = newActorSystem("Member1");
843 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
845 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
847 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
848 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
849 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
850 new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
851 withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
853 // Create an ActorSystem ShardManager actor for member-2.
855 final ActorSystem system2 = newActorSystem("Member2");
857 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
859 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
861 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
862 put("default", Arrays.asList("member-1", "member-2")).build());
864 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
865 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
866 new ClusterWrapperImpl(system2)).props().withDispatcher(
867 Dispatchers.DefaultDispatcherId()), shardManagerID);
869 new JavaTestKit(system1) {{
870 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
871 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
872 shardManager1.tell(new ActorInitialized(), mockShardActor1);
873 shardManager2.tell(new ActorInitialized(), mockShardActor2);
875 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
876 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
877 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
878 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
879 shardManager1.tell(new RoleChangeNotification(memberId1,
880 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
881 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
882 DataStoreVersions.CURRENT_VERSION),
884 shardManager2.tell(new RoleChangeNotification(memberId2,
885 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
886 shardManager1.underlyingActor().waitForMemberUp();
888 shardManager1.tell(new FindPrimary("default", true), getRef());
890 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
891 String path = found.getPrimaryPath();
892 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
894 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
895 mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
897 shardManager1.tell(MockClusterWrapper.
898 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
900 shardManager1.underlyingActor().waitForUnreachableMember();
902 shardManager1.tell(new FindPrimary("default", true), getRef());
904 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
906 assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
908 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
909 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
910 shardManager1.tell(new RoleChangeNotification(memberId1,
911 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
913 shardManager1.tell(new FindPrimary("default", true), getRef());
915 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
916 String path1 = found1.getPrimaryPath();
917 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
921 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
926 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
927 new JavaTestKit(getSystem()) {{
928 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
930 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
932 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
934 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
936 assertEquals("getShardName", "non-existent", notFound.getShardName());
941 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
942 new JavaTestKit(getSystem()) {{
943 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
945 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
946 shardManager.tell(new ActorInitialized(), mockShardActor);
948 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
950 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
952 assertTrue("Found path contains " + found.getPath().path().toString(),
953 found.getPath().path().toString().contains("member-1-shard-default-config"));
958 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
959 new JavaTestKit(getSystem()) {{
960 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
962 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
964 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
969 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
970 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
971 new JavaTestKit(getSystem()) {{
972 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
974 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
976 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
977 // delayed until we send ActorInitialized.
978 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
979 new Timeout(5, TimeUnit.SECONDS));
981 shardManager.tell(new ActorInitialized(), mockShardActor);
983 Object resp = Await.result(future, duration("5 seconds"));
984 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
987 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
991 public void testOnRecoveryJournalIsCleaned() {
992 String persistenceID = "shard-manager-" + shardMrgIDSuffix;
993 InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
994 InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
995 InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
997 newTestShardManager();
999 InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
1001 // Journal entries up to the last one should've been deleted
1002 Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
1003 synchronized (journal) {
1004 assertEquals("Journal size", 0, journal.size());
1009 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1010 TestShardManager shardManager = newTestShardManager();
1012 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1013 shardManager.onReceiveCommand(new RoleChangeNotification(
1014 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1016 verify(ready, never()).countDown();
1018 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1019 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1021 verify(ready, times(1)).countDown();
1025 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1026 new JavaTestKit(getSystem()) {{
1027 TestShardManager shardManager = newTestShardManager();
1029 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1030 shardManager.onReceiveCommand(new RoleChangeNotification(
1031 memberId, null, RaftState.Follower.name()));
1033 verify(ready, never()).countDown();
1035 shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1037 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1038 "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1039 DataStoreVersions.CURRENT_VERSION));
1041 verify(ready, times(1)).countDown();
1046 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1047 new JavaTestKit(getSystem()) {{
1048 TestShardManager shardManager = newTestShardManager();
1050 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1051 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1053 verify(ready, never()).countDown();
1055 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1056 "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
1057 DataStoreVersions.CURRENT_VERSION));
1059 shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1061 verify(ready, times(1)).countDown();
1066 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1067 TestShardManager shardManager = newTestShardManager();
1069 shardManager.onReceiveCommand(new RoleChangeNotification(
1070 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1072 verify(ready, never()).countDown();
1076 public void testByDefaultSyncStatusIsFalse() throws Exception{
1077 TestShardManager shardManager = newTestShardManager();
1079 assertEquals(false, shardManager.getMBean().getSyncStatus());
1083 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1084 TestShardManager shardManager = newTestShardManager();
1086 shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1087 RaftState.Follower.name(), RaftState.Leader.name()));
1089 assertEquals(true, shardManager.getMBean().getSyncStatus());
1093 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1094 TestShardManager shardManager = newTestShardManager();
1096 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1097 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1098 RaftState.Follower.name(), RaftState.Candidate.name()));
1100 assertEquals(false, shardManager.getMBean().getSyncStatus());
1102 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1103 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1106 assertEquals(false, shardManager.getMBean().getSyncStatus());
1110 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1111 TestShardManager shardManager = newTestShardManager();
1113 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1114 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1115 RaftState.Candidate.name(), RaftState.Follower.name()));
1117 // Initially will be false
1118 assertEquals(false, shardManager.getMBean().getSyncStatus());
1120 // Send status true will make sync status true
1121 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1123 assertEquals(true, shardManager.getMBean().getSyncStatus());
1125 // Send status false will make sync status false
1126 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1128 assertEquals(false, shardManager.getMBean().getSyncStatus());
1133 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1134 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1135 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1137 public List<String> getMemberShardNames(MemberName memberName) {
1138 return Arrays.asList("default", "astronauts");
1142 // Initially will be false
1143 assertEquals(false, shardManager.getMBean().getSyncStatus());
1145 // Make default shard leader
1146 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1147 shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1148 RaftState.Follower.name(), RaftState.Leader.name()));
1150 // default = Leader, astronauts is unknown so sync status remains false
1151 assertEquals(false, shardManager.getMBean().getSyncStatus());
1153 // Make astronauts shard leader as well
1154 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1155 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1156 RaftState.Follower.name(), RaftState.Leader.name()));
1158 // Now sync status should be true
1159 assertEquals(true, shardManager.getMBean().getSyncStatus());
1161 // Make astronauts a Follower
1162 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1163 RaftState.Leader.name(), RaftState.Follower.name()));
1165 // Sync status is not true
1166 assertEquals(false, shardManager.getMBean().getSyncStatus());
1168 // Make the astronauts follower sync status true
1169 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1171 // Sync status is now true
1172 assertEquals(true, shardManager.getMBean().getSyncStatus());
1174 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1178 public void testOnReceiveSwitchShardBehavior() throws Exception {
1179 new JavaTestKit(getSystem()) {{
1180 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1182 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1183 shardManager.tell(new ActorInitialized(), mockShardActor);
1185 shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1187 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1189 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1190 assertEquals(1000, switchBehavior.getNewTerm());
1194 private static List<MemberName> members(String... names) {
1195 return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1199 public void testOnCreateShard() {
1200 LOG.info("testOnCreateShard starting");
1201 new JavaTestKit(getSystem()) {{
1202 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1204 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1205 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1207 SchemaContext schemaContext = TestModel.createTestContext();
1208 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1210 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1211 persistent(false).build();
1212 Shard.Builder shardBuilder = Shard.builder();
1214 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1215 "foo", null, members("member-1", "member-5", "member-6"));
1216 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1218 expectMsgClass(duration("5 seconds"), Success.class);
1220 shardManager.tell(new FindLocalShard("foo", true), getRef());
1222 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1224 assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1225 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1226 getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1227 assertEquals("peerMembers", Sets.newHashSet(
1228 ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1229 ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1230 shardBuilder.getPeerAddresses().keySet());
1231 assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1232 shardBuilder.getId());
1233 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1235 // Send CreateShard with same name - should return Success with a message.
1237 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1239 Success success = expectMsgClass(duration("5 seconds"), Success.class);
1240 assertNotNull("Success status is null", success.status());
1243 LOG.info("testOnCreateShard ending");
1247 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1248 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1249 new JavaTestKit(getSystem()) {{
1250 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1252 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1253 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1255 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1257 Shard.Builder shardBuilder = Shard.builder();
1258 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1259 "foo", null, members("member-5", "member-6"));
1261 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1262 expectMsgClass(duration("5 seconds"), Success.class);
1264 shardManager.tell(new FindLocalShard("foo", true), getRef());
1265 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1267 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1268 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1269 shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1272 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1276 public void testOnCreateShardWithNoInitialSchemaContext() {
1277 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1278 new JavaTestKit(getSystem()) {{
1279 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1280 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1282 Shard.Builder shardBuilder = Shard.builder();
1284 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1285 "foo", null, members("member-1"));
1286 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1288 expectMsgClass(duration("5 seconds"), Success.class);
1290 SchemaContext schemaContext = TestModel.createTestContext();
1291 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1293 shardManager.tell(new FindLocalShard("foo", true), getRef());
1295 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1297 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1298 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1301 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1305 public void testGetSnapshot() throws Throwable {
1306 LOG.info("testGetSnapshot starting");
1307 JavaTestKit kit = new JavaTestKit(getSystem());
1309 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1310 put("shard1", Arrays.asList("member-1")).
1311 put("shard2", Arrays.asList("member-1")).
1312 put("astronauts", Collections.<String>emptyList()).build());
1314 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1315 withDispatcher(Dispatchers.DefaultDispatcherId()));
1317 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1318 Failure failure = kit.expectMsgClass(Failure.class);
1319 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1321 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1323 waitForShardInitialized(shardManager, "shard1", kit);
1324 waitForShardInitialized(shardManager, "shard2", kit);
1326 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1328 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1330 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1331 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1333 Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
1335 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1336 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1338 // Add a new replica
1340 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1342 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1343 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1345 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1346 mockShardLeaderKit.expectMsgClass(AddServer.class);
1347 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1348 kit.expectMsgClass(Status.Success.class);
1349 waitForShardInitialized(shardManager, "astronauts", kit);
1351 // Send another GetSnapshot and verify
1353 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1354 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1356 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1357 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1359 byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1360 assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1361 ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1362 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1363 Sets.newHashSet(snapshot.getShardList()));
1365 LOG.info("testGetSnapshot ending");
1369 public void testRestoreFromSnapshot() throws Throwable {
1370 LOG.info("testRestoreFromSnapshot starting");
1372 datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1374 JavaTestKit kit = new JavaTestKit(getSystem());
1376 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1377 put("shard1", Collections.<String>emptyList()).
1378 put("shard2", Collections.<String>emptyList()).
1379 put("astronauts", Collections.<String>emptyList()).build());
1382 ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1383 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1384 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1385 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1386 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1388 shardManager.underlyingActor().waitForRecoveryComplete();
1390 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1392 waitForShardInitialized(shardManager, "shard1", kit);
1393 waitForShardInitialized(shardManager, "shard2", kit);
1394 waitForShardInitialized(shardManager, "astronauts", kit);
1396 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1398 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1400 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1402 byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1403 assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1404 snapshot = SerializationUtils.deserialize(snapshotBytes);
1405 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1406 Sets.newHashSet(snapshot.getShardList()));
1408 LOG.info("testRestoreFromSnapshot ending");
1412 public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1413 new JavaTestKit(getSystem()) {{
1414 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1415 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1417 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1418 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1420 assertEquals("Failure obtained", true,
1421 (resp.cause() instanceof IllegalArgumentException));
1426 public void testAddShardReplica() throws Exception {
1427 LOG.info("testAddShardReplica starting");
1428 MockConfiguration mockConfig =
1429 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1430 put("default", Arrays.asList("member-1", "member-2")).
1431 put("astronauts", Arrays.asList("member-2")).build());
1433 final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1434 datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1436 // Create an ActorSystem ShardManager actor for member-1.
1437 final ActorSystem system1 = newActorSystem("Member1");
1438 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1439 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1440 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1441 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1442 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1444 // Create an ActorSystem ShardManager actor for member-2.
1445 final ActorSystem system2 = newActorSystem("Member2");
1446 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1448 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1449 String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1450 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1451 TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
1452 new AddServerReply(ServerChangeStatus.OK, memberId2)).
1453 withDispatcher(Dispatchers.DefaultDispatcherId()), name);
1454 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1455 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1456 new ClusterWrapperImpl(system2)).props().
1457 withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1459 new JavaTestKit(system1) {{
1461 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1462 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1464 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1466 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1467 leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1468 mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1469 leaderShardManager.tell(new RoleChangeNotification(memberId2,
1470 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1472 newReplicaShardManager.underlyingActor().waitForMemberUp();
1473 leaderShardManager.underlyingActor().waitForMemberUp();
1475 //Have a dummy snapshot to be overwritten by the new data persisted.
1476 String[] restoredShards = {"default", "people"};
1477 ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1478 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1479 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1481 InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1482 InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1484 //construct a mock response message
1485 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1486 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1488 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1489 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1490 expectMsgClass(duration("5 seconds"), Status.Success.class);
1492 InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1493 InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1494 List<ShardManagerSnapshot> persistedSnapshots =
1495 InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
1496 assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1497 ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1498 assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1499 Sets.newHashSet(shardManagerSnapshot.getShardList()));
1501 LOG.info("testAddShardReplica ending");
1505 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1506 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1507 new JavaTestKit(getSystem()) {{
1508 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1509 newPropsShardMgrWithMockShardActor(), shardMgrID);
1511 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1512 shardManager.tell(new ActorInitialized(), mockShardActor);
1514 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1515 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1516 ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1517 Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1519 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1521 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1522 shardManager.tell(new RoleChangeNotification(newReplicaId,
1523 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1524 shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId,
1525 DataStoreVersions.CURRENT_VERSION), mockShardActor);
1527 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1529 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1531 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1532 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1534 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1535 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1537 // Send message again to verify previous in progress state is cleared
1539 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1540 resp = expectMsgClass(duration("5 seconds"), Failure.class);
1541 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1543 // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1545 shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1546 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1547 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1548 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1549 expectMsgClass(duration("5 seconds"), Failure.class);
1551 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1552 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1555 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1559 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1560 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1561 new JavaTestKit(getSystem()) {{
1562 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1563 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1565 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1566 shardManager.tell(new ActorInitialized(), mockShardActor);
1567 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1568 DataStoreVersions.CURRENT_VERSION), getRef());
1569 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1570 RaftState.Leader.name())), mockShardActor);
1572 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1573 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1574 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1576 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1577 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1580 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1584 public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1585 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1586 new JavaTestKit(getSystem()) {{
1587 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1589 MockConfiguration mockConfig =
1590 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1591 put("astronauts", Arrays.asList("member-2")).build());
1593 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1594 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1595 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1596 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1598 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1600 JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1601 terminateWatcher.watch(mockNewReplicaShardActor);
1603 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1605 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1606 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1607 addServerMsg.getNewServerId());
1608 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1610 Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1611 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1613 shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1614 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1616 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1618 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1619 mockShardLeaderKit.expectMsgClass(AddServer.class);
1620 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1621 failure = expectMsgClass(duration("5 seconds"), Failure.class);
1622 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1625 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1629 public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1630 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1631 AddServer.class, new AddShardReplica("astronauts"));
1635 public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1636 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1637 datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1638 new JavaTestKit(getSystem()) {{
1639 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1640 put("astronauts", Arrays.asList("member-2")).build());
1642 final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1643 shardActor(mockShardActor).props(), shardMgrID);
1645 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1646 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1647 AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
1649 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1650 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1651 assertEquals("Failure obtained", true,
1652 (resp.cause() instanceof RuntimeException));
1655 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1659 public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1660 new JavaTestKit(getSystem()) {{
1661 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1662 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1664 shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
1665 Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1666 assertEquals("Failure obtained", true,
1667 (resp.cause() instanceof PrimaryNotFoundException));
1675 public void testRemoveShardReplicaLocal() throws Exception {
1676 new JavaTestKit(getSystem()) {{
1677 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1679 final TestActorRef<MockRespondActor> respondActor =
1680 actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
1681 new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1683 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1685 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1686 shardManager.tell(new ActorInitialized(), respondActor);
1687 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1688 DataStoreVersions.CURRENT_VERSION), getRef());
1689 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1690 RaftState.Leader.name())), respondActor);
1692 shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
1693 final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
1694 assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1695 removeServer.getServerId());
1696 expectMsgClass(duration("5 seconds"), Success.class);
1701 public void testRemoveShardReplicaRemote() throws Exception {
1702 MockConfiguration mockConfig =
1703 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1704 put("default", Arrays.asList("member-1", "member-2")).
1705 put("astronauts", Arrays.asList("member-1")).build());
1707 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1709 // Create an ActorSystem ShardManager actor for member-1.
1710 final ActorSystem system1 = newActorSystem("Member1");
1711 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1712 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1714 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1715 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1716 new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1719 // Create an ActorSystem ShardManager actor for member-2.
1720 final ActorSystem system2 = newActorSystem("Member2");
1721 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1723 String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1724 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1725 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1726 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1727 new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1729 LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1731 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1732 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1733 new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1736 // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1737 // akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1738 // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1739 // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1741 // akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1742 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1743 // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1745 // To work around this problem we create a ForwardingActor with the right address and pass to it the
1746 // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1747 // thing works as expected
1748 final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1749 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix);
1751 LOG.error("Forwarding actor : {}", actorRef);
1753 new JavaTestKit(system1) {{
1755 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1756 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1758 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1759 newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1761 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1762 leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1763 mock(DataTree.class), leaderVersion), mockShardLeaderActor);
1764 leaderShardManager.tell(new RoleChangeNotification(memberId2,
1765 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1767 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1768 newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
1769 mock(DataTree.class), leaderVersion), mockShardActor);
1770 newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
1771 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1773 newReplicaShardManager.underlyingActor().waitForMemberUp();
1774 leaderShardManager.underlyingActor().waitForMemberUp();
1776 //construct a mock response message
1777 newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
1778 RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1779 RemoveServer.class);
1780 String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1781 assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1782 expectMsgClass(duration("5 seconds"), Status.Success.class);
1788 public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1789 testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1790 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1794 public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1795 testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1796 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1800 public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1801 final Class<?> firstForwardedServerChangeClass,
1802 final Object secondServerChange) throws Exception {
1803 new JavaTestKit(getSystem()) {{
1804 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1805 JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1807 MockConfiguration mockConfig =
1808 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1809 put(shardName, Arrays.asList("member-2")).build());
1811 final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1812 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
1813 new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1816 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1818 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1820 shardManager.tell(firstServerChange, getRef());
1822 mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1824 shardManager.tell(secondServerChange, secondRequestKit.getRef());
1826 secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1831 public void testServerRemovedShardActorNotRunning() throws Exception {
1832 LOG.info("testServerRemovedShardActorNotRunning starting");
1833 new JavaTestKit(getSystem()) {{
1834 MockConfiguration mockConfig =
1835 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1836 put("default", Arrays.asList("member-1", "member-2")).
1837 put("astronauts", Arrays.asList("member-2")).
1838 put("people", Arrays.asList("member-1", "member-2")).build());
1840 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1842 shardManager.underlyingActor().waitForRecoveryComplete();
1843 shardManager.tell(new FindLocalShard("people", false), getRef());
1844 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1846 shardManager.tell(new FindLocalShard("default", false), getRef());
1847 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1849 // Removed the default shard replica from member-1
1850 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1851 ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build();
1852 shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1854 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1857 LOG.info("testServerRemovedShardActorNotRunning ending");
1861 public void testServerRemovedShardActorRunning() throws Exception {
1862 LOG.info("testServerRemovedShardActorRunning starting");
1863 new JavaTestKit(getSystem()) {{
1864 MockConfiguration mockConfig =
1865 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1866 put("default", Arrays.asList("member-1", "member-2")).
1867 put("astronauts", Arrays.asList("member-2")).
1868 put("people", Arrays.asList("member-1", "member-2")).build());
1870 String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1871 TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1872 MessageCollectorActor.props(), shardId);
1874 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1875 newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1877 shardManager.underlyingActor().waitForRecoveryComplete();
1879 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1880 shardManager.tell(new ActorInitialized(), shard);
1882 waitForShardInitialized(shardManager, "people", this);
1883 waitForShardInitialized(shardManager, "default", this);
1885 // Removed the default shard replica from member-1
1886 shardManager.tell(new ServerRemoved(shardId), getRef());
1888 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1890 MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1893 LOG.info("testServerRemovedShardActorRunning ending");
1898 public void testShardPersistenceWithRestoredData() throws Exception {
1899 LOG.info("testShardPersistenceWithRestoredData starting");
1900 new JavaTestKit(getSystem()) {{
1901 MockConfiguration mockConfig =
1902 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1903 put("default", Arrays.asList("member-1", "member-2")).
1904 put("astronauts", Arrays.asList("member-2")).
1905 put("people", Arrays.asList("member-1", "member-2")).build());
1906 String[] restoredShards = {"default", "astronauts"};
1907 ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1908 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1910 //create shardManager to come up with restored data
1911 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1912 newShardMgrProps(mockConfig));
1914 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1916 newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1917 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1918 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1920 //Verify a local shard is created for the restored shards,
1921 //although we expect a NotInitializedException for the shards as the actor initialization
1922 //message is not sent for them
1923 newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1924 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1926 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1927 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1930 LOG.info("testShardPersistenceWithRestoredData ending");
1934 public void testShutDown() throws Exception {
1935 LOG.info("testShutDown starting");
1936 new JavaTestKit(getSystem()) {{
1937 MockConfiguration mockConfig =
1938 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1939 put("shard1", Arrays.asList("member-1")).
1940 put("shard2", Arrays.asList("member-1")).build());
1942 String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1943 TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
1944 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
1946 String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1947 TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
1948 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
1950 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
1951 mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
1952 withDispatcher(Dispatchers.DefaultDispatcherId()));
1954 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1955 shardManager.tell(new ActorInitialized(), shard1);
1956 shardManager.tell(new ActorInitialized(), shard2);
1958 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1959 Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1961 MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1962 MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1965 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1966 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1967 } catch(TimeoutException e) {
1971 actorFactory.killActor(shard1, this);
1972 actorFactory.killActor(shard2, this);
1974 Boolean stopped = Await.result(stopFuture, duration);
1975 assertEquals("Stopped", Boolean.TRUE, stopped);
1978 LOG.info("testShutDown ending");
1982 public void testChangeServersVotingStatus() throws Exception {
1983 new JavaTestKit(getSystem()) {{
1984 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1986 TestActorRef<MockRespondActor> respondActor =
1987 actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
1988 new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
1990 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1992 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1993 shardManager.tell(new ActorInitialized(), respondActor);
1994 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1995 DataStoreVersions.CURRENT_VERSION), getRef());
1996 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1997 RaftState.Leader.name())), respondActor);
1999 shardManager.tell(new ChangeShardMembersVotingStatus("default",
2000 ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
2002 ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
2003 respondActor, ChangeServersVotingStatus.class);
2004 assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2005 ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"),
2006 shardMrgIDSuffix).toString(), Boolean.TRUE));
2008 expectMsgClass(duration("5 seconds"), Success.class);
2013 public void testChangeServersVotingStatusWithNoLeader() throws Exception {
2014 new JavaTestKit(getSystem()) {{
2015 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2017 TestActorRef<MockRespondActor> respondActor =
2018 actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2019 new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2021 ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2023 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
2024 shardManager.tell(new ActorInitialized(), respondActor);
2025 shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
2027 shardManager.tell(new ChangeShardMembersVotingStatus("default",
2028 ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
2030 MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2032 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
2033 assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
2037 private static class TestShardManager extends ShardManager {
2038 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2039 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2040 private ShardManagerSnapshot snapshot;
2041 private final Map<String, ActorRef> shardActors;
2042 private final ActorRef shardActor;
2043 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2044 private CountDownLatch memberUpReceived = new CountDownLatch(1);
2045 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2046 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2047 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2048 private volatile MessageInterceptor messageInterceptor;
2050 private TestShardManager(Builder builder) {
2052 shardActor = builder.shardActor;
2053 shardActors = builder.shardActors;
2057 protected void handleRecover(Object message) throws Exception {
2059 super.handleRecover(message);
2061 if(message instanceof RecoveryCompleted) {
2062 recoveryComplete.countDown();
2067 private void countDownIfOther(final Member member, CountDownLatch latch) {
2068 if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2074 public void handleCommand(Object message) throws Exception {
2076 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2077 getSender().tell(messageInterceptor.apply(message), getSelf());
2079 super.handleCommand(message);
2082 if(message instanceof FindPrimary) {
2083 findPrimaryMessageReceived.countDown();
2084 } else if(message instanceof ClusterEvent.MemberUp) {
2085 countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived);
2086 } else if(message instanceof ClusterEvent.MemberRemoved) {
2087 countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived);
2088 } else if(message instanceof ClusterEvent.UnreachableMember) {
2089 countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived);
2090 } else if(message instanceof ClusterEvent.ReachableMember) {
2091 countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived);
2096 void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2097 this.messageInterceptor = messageInterceptor;
2100 void waitForRecoveryComplete() {
2101 assertEquals("Recovery complete", true,
2102 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2105 void waitForMemberUp() {
2106 assertEquals("MemberUp received", true,
2107 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2108 memberUpReceived = new CountDownLatch(1);
2111 void waitForMemberRemoved() {
2112 assertEquals("MemberRemoved received", true,
2113 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2114 memberRemovedReceived = new CountDownLatch(1);
2117 void waitForUnreachableMember() {
2118 assertEquals("UnreachableMember received", true,
2119 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2121 memberUnreachableReceived = new CountDownLatch(1);
2124 void waitForReachableMember() {
2125 assertEquals("ReachableMember received", true,
2126 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2127 memberReachableReceived = new CountDownLatch(1);
2130 void verifyFindPrimary() {
2131 assertEquals("FindPrimary received", true,
2132 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2133 findPrimaryMessageReceived = new CountDownLatch(1);
2136 public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2137 return new Builder(datastoreContextBuilder);
2140 private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2141 private ActorRef shardActor;
2142 private final Map<String, ActorRef> shardActors = new HashMap<>();
2144 Builder(DatastoreContext.Builder datastoreContextBuilder) {
2145 super(TestShardManager.class);
2146 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2149 Builder shardActor(ActorRef shardActor) {
2150 this.shardActor = shardActor;
2154 Builder addShardActor(String shardName, ActorRef actorRef){
2155 shardActors.put(shardName, actorRef);
2161 public void saveSnapshot(Object obj) {
2162 snapshot = (ShardManagerSnapshot) obj;
2163 snapshotPersist.countDown();
2164 super.saveSnapshot(obj);
2167 void verifySnapshotPersisted(Set<String> shardList) {
2168 assertEquals("saveSnapshot invoked", true,
2169 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2170 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2174 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2175 if(shardActors.get(info.getShardName()) != null){
2176 return shardActors.get(info.getShardName());
2179 if(shardActor != null) {
2183 return super.newShardActor(schemaContext, info);
2187 private static abstract class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2188 extends AbstractShardManagerCreator<T> {
2189 private final Class<C> shardManagerClass;
2191 AbstractGenericCreator(Class<C> shardManagerClass) {
2192 this.shardManagerClass = shardManagerClass;
2193 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
2194 waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2198 public Props props() {
2200 return Props.create(shardManagerClass, this);
2204 private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2205 GenericCreator(Class<C> shardManagerClass) {
2206 super(shardManagerClass);
2210 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2211 private static final long serialVersionUID = 1L;
2212 private final Creator<ShardManager> delegate;
2214 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2215 this.delegate = delegate;
2219 public ShardManager create() throws Exception {
2220 return delegate.create();
2224 interface MessageInterceptor extends Function<Object, Object> {
2225 boolean canIntercept(Object message);
2228 private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2229 return new MessageInterceptor(){
2231 public Object apply(Object message) {
2232 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2236 public boolean canIntercept(Object message) {
2237 return message instanceof FindPrimary;
2242 private static class MockRespondActor extends MessageCollectorActor {
2243 static final String CLEAR_RESPONSE = "clear-response";
2245 private Object responseMsg;
2246 private final Class<?> requestClass;
2248 @SuppressWarnings("unused")
2249 public MockRespondActor(Class<?> requestClass, Object responseMsg) {
2250 this.requestClass = requestClass;
2251 this.responseMsg = responseMsg;
2255 public void onReceive(Object message) throws Exception {
2256 if(message.equals(CLEAR_RESPONSE)) {
2259 super.onReceive(message);
2260 if (message.getClass().equals(requestClass) && responseMsg != null) {
2261 getSender().tell(responseMsg, getSelf());