2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.actor.Status;
25 import akka.actor.Status.Failure;
26 import akka.actor.Status.Success;
27 import akka.actor.Terminated;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.dispatch.Dispatchers;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.RecoveryCompleted;
34 import akka.serialization.Serialization;
35 import akka.testkit.JavaTestKit;
36 import akka.testkit.TestActorRef;
37 import akka.util.Timeout;
38 import com.google.common.base.Function;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Stopwatch;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.typesafe.config.ConfigFactory;
48 import java.util.AbstractMap;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
56 import java.util.Map.Entry;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import org.apache.commons.lang3.SerializationUtils;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.mockito.Mock;
66 import org.mockito.Mockito;
67 import org.mockito.MockitoAnnotations;
68 import org.opendaylight.controller.cluster.datastore.config.Configuration;
69 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
70 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
71 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
72 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
73 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
74 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
75 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
76 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
77 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
78 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
79 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
81 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
82 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
83 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
84 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
85 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
86 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
87 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
88 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
89 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
90 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
91 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
93 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
94 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
95 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
96 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
97 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
98 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
99 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
100 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
101 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
102 import org.opendaylight.controller.cluster.raft.RaftState;
103 import org.opendaylight.controller.cluster.raft.TestActorFactory;
104 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
105 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
106 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
107 import org.opendaylight.controller.cluster.raft.messages.AddServer;
108 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
109 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
110 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
111 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
112 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
113 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
114 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
115 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import org.slf4j.Logger;
119 import org.slf4j.LoggerFactory;
120 import scala.concurrent.Await;
121 import scala.concurrent.Future;
122 import scala.concurrent.duration.FiniteDuration;
124 public class ShardManagerTest extends AbstractActorTest {
125 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
127 private static int ID_COUNTER = 1;
129 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
130 private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
133 private static CountDownLatch ready;
135 private static TestActorRef<MessageCollectorActor> mockShardActor;
137 private static String mockShardName;
139 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
140 dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
141 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
143 private final Collection<ActorSystem> actorSystems = new ArrayList<>();
145 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
148 public void setUp() {
149 MockitoAnnotations.initMocks(this);
151 InMemoryJournal.clear();
152 InMemorySnapshotStore.clear();
154 if(mockShardActor == null) {
155 mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
156 mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
159 mockShardActor.underlyingActor().clear();
163 public void tearDown() {
164 InMemoryJournal.clear();
165 InMemorySnapshotStore.clear();
167 for(ActorSystem system: actorSystems) {
168 JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
171 actorFactory.close();
174 private ActorSystem newActorSystem(String config) {
175 ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
176 actorSystems.add(system);
180 private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
181 String name = new ShardIdentifier(shardName, memberName,"config").toString();
182 if(system == getSystem()) {
183 return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
186 return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
189 private Props newShardMgrProps() {
190 return newShardMgrProps(new MockConfiguration());
193 private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
194 DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
195 Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
196 Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
200 private TestShardManager.Builder newTestShardMgrBuilder() {
201 return TestShardManager.builder(datastoreContextBuilder);
204 private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
205 return TestShardManager.builder(datastoreContextBuilder).configuration(config);
208 private Props newShardMgrProps(Configuration config) {
209 return newTestShardMgrBuilder(config).props();
212 private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
213 return TestShardManager.builder(datastoreContextBuilder).shardActor(mockShardActor);
216 private Props newPropsShardMgrWithMockShardActor() {
217 return newTestShardMgrBuilderWithMockShardActor().props();
220 private TestShardManager newTestShardManager() {
221 return newTestShardManager(newShardMgrProps());
224 private TestShardManager newTestShardManager(Props props) {
225 TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
226 TestShardManager shardManager = shardManagerActor.underlyingActor();
227 shardManager.waitForRecoveryComplete();
231 private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
232 AssertionError last = null;
233 Stopwatch sw = Stopwatch.createStarted();
234 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
236 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
237 kit.expectMsgClass(LocalShardFound.class);
239 } catch(AssertionError e) {
243 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
249 private <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
250 Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
251 if(reply instanceof Failure) {
252 throw new AssertionError(msg + " failed", ((Failure)reply).cause());
259 public void testPerShardDatastoreContext() throws Exception {
260 LOG.info("testPerShardDatastoreContext starting");
261 final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
262 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
264 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
265 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
267 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
268 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
270 final MockConfiguration mockConfig = new MockConfiguration() {
272 public Collection<String> getMemberShardNames(String memberName) {
273 return Arrays.asList("default", "topology");
277 public Collection<String> getMembersFromShardName(String shardName) {
278 return Arrays.asList("member-1");
282 final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
283 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
284 final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
285 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
287 final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
288 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
289 shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
290 shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
292 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
293 final CountDownLatch newShardActorLatch = new CountDownLatch(2);
294 class LocalShardManager extends ShardManager {
295 public LocalShardManager(AbstractBuilder<?> builder) {
300 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
301 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
304 ref = entry.getKey();
305 entry.setValue(info.getDatastoreContext());
308 newShardActorLatch.countDown();
313 final Creator<ShardManager> creator = new Creator<ShardManager>() {
314 private static final long serialVersionUID = 1L;
316 public ShardManager create() throws Exception {
317 return new LocalShardManager(new GenericBuilder<LocalShardManager>(LocalShardManager.class).
318 datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
319 configuration(mockConfig));
323 JavaTestKit kit = new JavaTestKit(getSystem());
325 final ActorRef shardManager = actorFactory.createActor(Props.create(
326 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
328 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
330 assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
331 assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
332 getShardElectionTimeoutFactor());
333 assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
334 getShardElectionTimeoutFactor());
336 DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
337 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
338 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
339 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
341 Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
342 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
344 shardManager.tell(newMockFactory, kit.getRef());
346 DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
347 assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
349 newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
350 assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
352 LOG.info("testPerShardDatastoreContext ending");
356 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
357 new JavaTestKit(getSystem()) {{
358 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
360 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
362 shardManager.tell(new FindPrimary("non-existent", false), getRef());
364 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
369 public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
370 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
371 new JavaTestKit(getSystem()) {{
372 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
374 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
376 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
377 shardManager.tell(new ActorInitialized(), mockShardActor);
379 DataTree mockDataTree = mock(DataTree.class);
380 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
381 DataStoreVersions.CURRENT_VERSION), getRef());
383 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
384 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
385 RaftState.Leader.name())), mockShardActor);
387 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
389 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
390 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
391 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
392 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
395 LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
399 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
400 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
401 new JavaTestKit(getSystem()) {{
402 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
404 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
405 shardManager.tell(new ActorInitialized(), mockShardActor);
407 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
408 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
409 shardManager.tell(new RoleChangeNotification(memberId1,
410 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
411 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
413 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
415 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
418 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
422 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
423 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
424 new JavaTestKit(getSystem()) {{
425 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
427 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
428 shardManager.tell(new ActorInitialized(), mockShardActor);
430 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
431 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
433 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
434 shardManager.tell(new RoleChangeNotification(memberId1,
435 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
436 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
437 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
438 leaderVersion), mockShardActor);
440 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
442 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
443 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
444 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
445 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
448 LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
452 public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
453 new JavaTestKit(getSystem()) {{
454 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
456 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
458 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
463 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
464 new JavaTestKit(getSystem()) {{
465 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
467 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
468 shardManager.tell(new ActorInitialized(), mockShardActor);
470 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
472 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
477 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
478 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
479 new JavaTestKit(getSystem()) {{
480 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
482 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
483 shardManager.tell(new ActorInitialized(), mockShardActor);
485 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
486 shardManager.tell(new RoleChangeNotification(memberId,
487 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
489 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
491 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
493 DataTree mockDataTree = mock(DataTree.class);
494 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
495 DataStoreVersions.CURRENT_VERSION), mockShardActor);
497 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
499 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
500 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
501 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
502 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
505 LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
509 public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
510 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
511 new JavaTestKit(getSystem()) {{
512 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
514 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
516 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
517 // delayed until we send ActorInitialized and RoleChangeNotification.
518 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
520 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
522 shardManager.tell(new ActorInitialized(), mockShardActor);
524 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
526 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
527 shardManager.tell(new RoleChangeNotification(memberId,
528 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
530 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
532 DataTree mockDataTree = mock(DataTree.class);
533 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
534 DataStoreVersions.CURRENT_VERSION), mockShardActor);
536 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
537 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
538 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
539 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
541 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
544 LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
548 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
549 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
550 new JavaTestKit(getSystem()) {{
551 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
553 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
555 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
557 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
559 shardManager.tell(new ActorInitialized(), mockShardActor);
561 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
564 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
568 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
569 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
570 new JavaTestKit(getSystem()) {{
571 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
573 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
574 shardManager.tell(new ActorInitialized(), mockShardActor);
575 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
576 null, RaftState.Candidate.name()), mockShardActor);
578 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
580 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
583 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
587 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
588 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
589 new JavaTestKit(getSystem()) {{
590 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
592 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
593 shardManager.tell(new ActorInitialized(), mockShardActor);
594 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
595 null, RaftState.IsolatedLeader.name()), mockShardActor);
597 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
599 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
602 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
606 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
607 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
608 new JavaTestKit(getSystem()) {{
609 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
611 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
612 shardManager.tell(new ActorInitialized(), mockShardActor);
614 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
616 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
619 LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
623 public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
624 LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
625 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
627 // Create an ActorSystem ShardManager actor for member-1.
629 final ActorSystem system1 = newActorSystem("Member1");
630 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
632 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
633 newTestShardMgrBuilderWithMockShardActor().cluster(
634 new ClusterWrapperImpl(system1)).props(), shardManagerID);
636 // Create an ActorSystem ShardManager actor for member-2.
638 final ActorSystem system2 = newActorSystem("Member2");
640 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
642 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
644 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
645 put("default", Arrays.asList("member-1", "member-2")).
646 put("astronauts", Arrays.asList("member-2")).build());
648 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
649 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
650 new ClusterWrapperImpl(system2)).props(), shardManagerID);
652 new JavaTestKit(system1) {{
654 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
655 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
657 shardManager2.tell(new ActorInitialized(), mockShardActor2);
659 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
660 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
661 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
662 Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
663 shardManager2.tell(new RoleChangeNotification(memberId2,
664 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
666 shardManager1.underlyingActor().waitForMemberUp();
668 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
670 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
671 String path = found.getPrimaryPath();
672 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
673 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
675 shardManager2.underlyingActor().verifyFindPrimary();
677 Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
679 shardManager1.underlyingActor().waitForMemberRemoved();
681 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
683 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
686 LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
690 public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
691 LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
692 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
694 // Create an ActorSystem ShardManager actor for member-1.
696 final ActorSystem system1 = newActorSystem("Member1");
697 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
699 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
701 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
702 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
703 new ClusterWrapperImpl(system1)).props(), shardManagerID);
705 // Create an ActorSystem ShardManager actor for member-2.
707 final ActorSystem system2 = newActorSystem("Member2");
709 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
711 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
713 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
714 put("default", Arrays.asList("member-1", "member-2")).build());
716 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
717 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
718 new ClusterWrapperImpl(system2)).props(), shardManagerID);
720 new JavaTestKit(system1) {{
722 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
723 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
724 shardManager1.tell(new ActorInitialized(), mockShardActor1);
725 shardManager2.tell(new ActorInitialized(), mockShardActor2);
727 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
728 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
729 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
730 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
731 shardManager1.tell(new RoleChangeNotification(memberId1,
732 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
733 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
734 DataStoreVersions.CURRENT_VERSION),
736 shardManager2.tell(new RoleChangeNotification(memberId2,
737 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
738 shardManager1.underlyingActor().waitForMemberUp();
740 shardManager1.tell(new FindPrimary("default", true), getRef());
742 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
743 String path = found.getPrimaryPath();
744 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
746 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
747 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
749 shardManager1.underlyingActor().waitForUnreachableMember();
751 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
752 assertEquals("getMemberName", "member-2", peerDown.getMemberName());
753 MessageCollectorActor.clearMessages(mockShardActor1);
755 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
756 createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
758 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
760 shardManager1.tell(new FindPrimary("default", true), getRef());
762 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
764 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
765 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
767 shardManager1.underlyingActor().waitForReachableMember();
769 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
770 assertEquals("getMemberName", "member-2", peerUp.getMemberName());
771 MessageCollectorActor.clearMessages(mockShardActor1);
773 shardManager1.tell(new FindPrimary("default", true), getRef());
775 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
776 String path1 = found1.getPrimaryPath();
777 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
779 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
780 createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
782 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
786 LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
790 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
791 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
792 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
794 // Create an ActorSystem ShardManager actor for member-1.
796 final ActorSystem system1 = newActorSystem("Member1");
797 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
799 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
801 final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
802 final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
803 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
804 new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(),
807 // Create an ActorSystem ShardManager actor for member-2.
809 final ActorSystem system2 = newActorSystem("Member2");
811 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
813 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
815 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
816 put("default", Arrays.asList("member-1", "member-2")).build());
818 final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
819 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
820 new ClusterWrapperImpl(system2)).props(), shardManagerID);
822 new JavaTestKit(system1) {{
823 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
824 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
825 shardManager1.tell(new ActorInitialized(), mockShardActor1);
826 shardManager2.tell(new ActorInitialized(), mockShardActor2);
828 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
829 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
830 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
831 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
832 shardManager1.tell(new RoleChangeNotification(memberId1,
833 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
834 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
835 DataStoreVersions.CURRENT_VERSION),
837 shardManager2.tell(new RoleChangeNotification(memberId2,
838 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
839 shardManager1.underlyingActor().waitForMemberUp();
841 shardManager1.tell(new FindPrimary("default", true), getRef());
843 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
844 String path = found.getPrimaryPath();
845 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
847 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
848 mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
850 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
851 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
853 shardManager1.underlyingActor().waitForUnreachableMember();
855 shardManager1.tell(new FindPrimary("default", true), getRef());
857 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
859 assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
861 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
862 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
863 shardManager1.tell(new RoleChangeNotification(memberId1,
864 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
866 shardManager1.tell(new FindPrimary("default", true), getRef());
868 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
869 String path1 = found1.getPrimaryPath();
870 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
874 LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
879 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
880 new JavaTestKit(getSystem()) {{
881 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
883 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
885 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
887 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
889 assertEquals("getShardName", "non-existent", notFound.getShardName());
894 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
895 new JavaTestKit(getSystem()) {{
896 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
898 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
899 shardManager.tell(new ActorInitialized(), mockShardActor);
901 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
903 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
905 assertTrue("Found path contains " + found.getPath().path().toString(),
906 found.getPath().path().toString().contains("member-1-shard-default-config"));
911 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
912 new JavaTestKit(getSystem()) {{
913 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
915 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
917 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
922 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
923 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
924 new JavaTestKit(getSystem()) {{
925 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
927 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
929 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
930 // delayed until we send ActorInitialized.
931 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
932 new Timeout(5, TimeUnit.SECONDS));
934 shardManager.tell(new ActorInitialized(), mockShardActor);
936 Object resp = Await.result(future, duration("5 seconds"));
937 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
940 LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
944 public void testOnRecoveryJournalIsCleaned() {
945 String persistenceID = "shard-manager-" + shardMrgIDSuffix;
946 InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
947 ImmutableSet.of("foo")));
948 InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
949 ImmutableSet.of("bar")));
950 InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
952 TestShardManager shardManager = newTestShardManager();
954 InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
956 // Journal entries up to the last one should've been deleted
957 Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
958 synchronized (journal) {
959 assertEquals("Journal size", 0, journal.size());
964 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
965 TestShardManager shardManager = newTestShardManager();
967 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
968 shardManager.onReceiveCommand(new RoleChangeNotification(
969 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
971 verify(ready, never()).countDown();
973 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
974 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
976 verify(ready, times(1)).countDown();
980 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
981 new JavaTestKit(getSystem()) {{
982 TestShardManager shardManager = newTestShardManager();
984 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
985 shardManager.onReceiveCommand(new RoleChangeNotification(
986 memberId, null, RaftState.Follower.name()));
988 verify(ready, never()).countDown();
990 shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
992 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
993 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
994 DataStoreVersions.CURRENT_VERSION));
996 verify(ready, times(1)).countDown();
1001 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1002 new JavaTestKit(getSystem()) {{
1003 TestShardManager shardManager = newTestShardManager();
1005 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1006 shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1008 verify(ready, never()).countDown();
1010 shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1011 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
1012 DataStoreVersions.CURRENT_VERSION));
1014 shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1016 verify(ready, times(1)).countDown();
1021 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1022 TestShardManager shardManager = newTestShardManager();
1024 shardManager.onReceiveCommand(new RoleChangeNotification(
1025 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1027 verify(ready, never()).countDown();
1031 public void testByDefaultSyncStatusIsFalse() throws Exception{
1032 TestShardManager shardManager = newTestShardManager();
1034 assertEquals(false, shardManager.getMBean().getSyncStatus());
1038 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1039 TestShardManager shardManager = newTestShardManager();
1041 shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1042 RaftState.Follower.name(), RaftState.Leader.name()));
1044 assertEquals(true, shardManager.getMBean().getSyncStatus());
1048 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1049 TestShardManager shardManager = newTestShardManager();
1051 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1052 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1053 RaftState.Follower.name(), RaftState.Candidate.name()));
1055 assertEquals(false, shardManager.getMBean().getSyncStatus());
1057 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1058 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1061 assertEquals(false, shardManager.getMBean().getSyncStatus());
1065 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1066 TestShardManager shardManager = newTestShardManager();
1068 String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1069 shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1070 RaftState.Candidate.name(), RaftState.Follower.name()));
1072 // Initially will be false
1073 assertEquals(false, shardManager.getMBean().getSyncStatus());
1075 // Send status true will make sync status true
1076 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1078 assertEquals(true, shardManager.getMBean().getSyncStatus());
1080 // Send status false will make sync status false
1081 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1083 assertEquals(false, shardManager.getMBean().getSyncStatus());
1088 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1089 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1090 TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1092 public List<String> getMemberShardNames(String memberName) {
1093 return Arrays.asList("default", "astronauts");
1097 // Initially will be false
1098 assertEquals(false, shardManager.getMBean().getSyncStatus());
1100 // Make default shard leader
1101 String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1102 shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1103 RaftState.Follower.name(), RaftState.Leader.name()));
1105 // default = Leader, astronauts is unknown so sync status remains false
1106 assertEquals(false, shardManager.getMBean().getSyncStatus());
1108 // Make astronauts shard leader as well
1109 String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1110 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1111 RaftState.Follower.name(), RaftState.Leader.name()));
1113 // Now sync status should be true
1114 assertEquals(true, shardManager.getMBean().getSyncStatus());
1116 // Make astronauts a Follower
1117 shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1118 RaftState.Leader.name(), RaftState.Follower.name()));
1120 // Sync status is not true
1121 assertEquals(false, shardManager.getMBean().getSyncStatus());
1123 // Make the astronauts follower sync status true
1124 shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1126 // Sync status is now true
1127 assertEquals(true, shardManager.getMBean().getSyncStatus());
1129 LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1133 public void testOnReceiveSwitchShardBehavior() throws Exception {
1134 new JavaTestKit(getSystem()) {{
1135 final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1137 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1138 shardManager.tell(new ActorInitialized(), mockShardActor);
1140 shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
1142 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1144 assertEquals(RaftState.Leader, switchBehavior.getNewState());
1145 assertEquals(1000, switchBehavior.getNewTerm());
1150 public void testOnCreateShard() {
1151 LOG.info("testOnCreateShard starting");
1152 new JavaTestKit(getSystem()) {{
1153 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1155 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1156 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1158 SchemaContext schemaContext = TestModel.createTestContext();
1159 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1161 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1162 persistent(false).build();
1163 Shard.Builder shardBuilder = Shard.builder();
1165 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1166 "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1167 shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1169 expectMsgClass(duration("5 seconds"), Success.class);
1171 shardManager.tell(new FindLocalShard("foo", true), getRef());
1173 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1175 assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1176 assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1177 getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1178 assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1179 new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1180 shardBuilder.getPeerAddresses().keySet());
1181 assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1182 shardBuilder.getId());
1183 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1185 // Send CreateShard with same name - should return Success with a message.
1187 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1189 Success success = expectMsgClass(duration("5 seconds"), Success.class);
1190 assertNotNull("Success status is null", success.status());
1193 LOG.info("testOnCreateShard ending");
1197 public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1198 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1199 new JavaTestKit(getSystem()) {{
1200 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1202 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1203 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1205 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1207 Shard.Builder shardBuilder = Shard.builder();
1208 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1209 "foo", null, Arrays.asList("member-5", "member-6"));
1211 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1212 expectMsgClass(duration("5 seconds"), Success.class);
1214 shardManager.tell(new FindLocalShard("foo", true), getRef());
1215 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1217 assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1218 assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1219 shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1222 LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1226 public void testOnCreateShardWithNoInitialSchemaContext() {
1227 LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1228 new JavaTestKit(getSystem()) {{
1229 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1230 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1232 Shard.Builder shardBuilder = Shard.builder();
1234 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1235 "foo", null, Arrays.asList("member-1"));
1236 shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1238 expectMsgClass(duration("5 seconds"), Success.class);
1240 SchemaContext schemaContext = TestModel.createTestContext();
1241 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1243 shardManager.tell(new FindLocalShard("foo", true), getRef());
1245 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1247 assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1248 assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1251 LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1255 public void testGetSnapshot() throws Throwable {
1256 LOG.info("testGetSnapshot starting");
1257 JavaTestKit kit = new JavaTestKit(getSystem());
1259 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1260 put("shard1", Arrays.asList("member-1")).
1261 put("shard2", Arrays.asList("member-1")).
1262 put("astronauts", Collections.<String>emptyList()).build());
1264 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1265 withDispatcher(Dispatchers.DefaultDispatcherId()));
1267 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1268 Failure failure = kit.expectMsgClass(Failure.class);
1269 assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1271 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1273 waitForShardInitialized(shardManager, "shard1", kit);
1274 waitForShardInitialized(shardManager, "shard2", kit);
1276 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1278 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1280 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1281 assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1283 Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
1285 public String apply(ShardSnapshot s) {
1290 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1291 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1293 // Add a new replica
1295 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1297 TestShardManager shardManagerInstance = shardManager.underlyingActor();
1298 shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1300 shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1301 mockShardLeaderKit.expectMsgClass(AddServer.class);
1302 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1303 kit.expectMsgClass(Status.Success.class);
1304 waitForShardInitialized(shardManager, "astronauts", kit);
1306 // Send another GetSnapshot and verify
1308 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1309 datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1311 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1312 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1314 byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1315 assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1316 ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1317 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1318 Sets.newHashSet(snapshot.getShardList()));
1320 LOG.info("testGetSnapshot ending");
1324 public void testRestoreFromSnapshot() throws Throwable {
1325 LOG.info("testRestoreFromSnapshot starting");
1327 JavaTestKit kit = new JavaTestKit(getSystem());
1329 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1330 put("shard1", Collections.<String>emptyList()).
1331 put("shard2", Collections.<String>emptyList()).
1332 put("astronauts", Collections.<String>emptyList()).build());
1335 ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1336 DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1337 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1338 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1339 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1341 shardManager.underlyingActor().waitForRecoveryComplete();
1343 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1345 waitForShardInitialized(shardManager, "shard1", kit);
1346 waitForShardInitialized(shardManager, "shard2", kit);
1347 waitForShardInitialized(shardManager, "astronauts", kit);
1349 shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1351 DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1353 assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1355 byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1356 assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1357 snapshot = SerializationUtils.deserialize(snapshotBytes);
1358 assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1359 Sets.newHashSet(snapshot.getShardList()));
1361 LOG.info("testRestoreFromSnapshot ending");
1365 public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1366 new JavaTestKit(getSystem()) {{
1367 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1368 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1370 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1371 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1373 assertEquals("Failure obtained", true,
1374 (resp.cause() instanceof IllegalArgumentException));
1379 public void testAddShardReplica() throws Exception {
1380 LOG.info("testAddShardReplica starting");
1382 MockConfiguration mockConfig =
1383 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1384 put("default", Arrays.asList("member-1", "member-2")).
1385 put("astronauts", Arrays.asList("member-2")).build());
1387 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1389 // Create an ActorSystem ShardManager actor for member-1.
1390 final ActorSystem system1 = newActorSystem("Member1");
1391 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1392 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1393 final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1394 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1395 new ClusterWrapperImpl(system1)).props(), shardManagerID);
1397 // Create an ActorSystem ShardManager actor for member-2.
1398 final ActorSystem system2 = newActorSystem("Member2");
1399 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1401 String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1402 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1403 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1404 final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1405 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1406 new ClusterWrapperImpl(system2)).props(), shardManagerID);
1408 new JavaTestKit(system1) {{
1410 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1411 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1413 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1415 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1416 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1417 leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1418 Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1419 leaderShardManager.tell(new RoleChangeNotification(memberId2,
1420 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1422 newReplicaShardManager.underlyingActor().waitForMemberUp();
1423 leaderShardManager.underlyingActor().waitForMemberUp();
1425 //construct a mock response message
1426 AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1427 mockShardLeaderActor.underlyingActor().updateResponse(response);
1428 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1429 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1431 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1432 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1433 newReplicaShardManager.underlyingActor()
1434 .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
1435 expectMsgClass(duration("5 seconds"), Status.Success.class);
1438 LOG.info("testAddShardReplica ending");
1442 public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1443 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1444 new JavaTestKit(getSystem()) {{
1445 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1446 newPropsShardMgrWithMockShardActor(), shardMgrID);
1448 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1449 shardManager.tell(new ActorInitialized(), mockShardActor);
1451 String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1452 AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1453 ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1454 Props.create(MockRespondActor.class, addServerReply), leaderId);
1456 MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1458 String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1459 shardManager.tell(new RoleChangeNotification(newReplicaId,
1460 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1461 shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
1462 DataStoreVersions.CURRENT_VERSION), mockShardActor);
1464 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1466 MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1468 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1469 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1471 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1472 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1474 // Send message again to verify previous in progress state is cleared
1476 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1477 resp = expectMsgClass(duration("5 seconds"), Failure.class);
1478 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1480 // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1482 shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1483 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1484 leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1485 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1486 expectMsgClass(duration("5 seconds"), Failure.class);
1488 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1489 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1492 LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1496 public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1497 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1498 new JavaTestKit(getSystem()) {{
1499 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1500 ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1502 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1503 shardManager.tell(new ActorInitialized(), mockShardActor);
1504 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1505 DataStoreVersions.CURRENT_VERSION), getRef());
1506 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1507 RaftState.Leader.name())), mockShardActor);
1509 shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1510 Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1511 assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1513 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1514 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1517 LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1521 public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1522 LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1523 new JavaTestKit(getSystem()) {{
1524 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1526 MockConfiguration mockConfig =
1527 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1528 put("astronauts", Arrays.asList("member-2")).build());
1530 ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1531 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1532 newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1533 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1535 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1537 JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1538 terminateWatcher.watch(mockNewReplicaShardActor);
1540 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1542 AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1543 assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1544 addServerMsg.getNewServerId());
1545 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1547 Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1548 assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1550 shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1551 expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1553 terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1555 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1556 mockShardLeaderKit.expectMsgClass(AddServer.class);
1557 mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1558 failure = expectMsgClass(duration("5 seconds"), Failure.class);
1559 assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1562 LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1566 public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1567 LOG.info("testAddShardReplicaWithAlreadyInProgress starting");
1568 new JavaTestKit(getSystem()) {{
1569 JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1570 JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1572 MockConfiguration mockConfig =
1573 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1574 put("astronauts", Arrays.asList("member-2")).build());
1576 final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1577 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID);
1578 shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1580 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1582 shardManager.tell(new AddShardReplica("astronauts"), getRef());
1584 mockShardLeaderKit.expectMsgClass(AddServer.class);
1586 shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
1588 secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1591 LOG.info("testAddShardReplicaWithAlreadyInProgress ending");
1595 public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1596 LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1597 new JavaTestKit(getSystem()) {{
1598 MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1599 put("astronauts", Arrays.asList("member-2")).build());
1601 final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1602 shardActor(mockShardActor).props(), shardMgrID);
1604 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1605 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1607 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1608 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1609 assertEquals("Failure obtained", true,
1610 (resp.cause() instanceof RuntimeException));
1613 LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1617 public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1618 new JavaTestKit(getSystem()) {{
1619 ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1620 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1622 shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
1623 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1624 assertEquals("Failure obtained", true,
1625 (resp.cause() instanceof IllegalArgumentException));
1631 public void testServerRemovedShardActorNotRunning() throws Exception {
1632 LOG.info("testServerRemovedShardActorNotRunning starting");
1633 new JavaTestKit(getSystem()) {{
1634 MockConfiguration mockConfig =
1635 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1636 put("default", Arrays.asList("member-1", "member-2")).
1637 put("astronauts", Arrays.asList("member-2")).
1638 put("people", Arrays.asList("member-1", "member-2")).build());
1640 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1642 shardManager.underlyingActor().waitForRecoveryComplete();
1643 shardManager.tell(new FindLocalShard("people", false), getRef());
1644 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1646 shardManager.tell(new FindLocalShard("default", false), getRef());
1647 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1649 // Removed the default shard replica from member-1
1650 ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1651 ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
1652 shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1654 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1657 LOG.info("testServerRemovedShardActorNotRunning ending");
1661 public void testServerRemovedShardActorRunning() throws Exception {
1662 LOG.info("testServerRemovedShardActorRunning starting");
1663 new JavaTestKit(getSystem()) {{
1664 MockConfiguration mockConfig =
1665 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1666 put("default", Arrays.asList("member-1", "member-2")).
1667 put("astronauts", Arrays.asList("member-2")).
1668 put("people", Arrays.asList("member-1", "member-2")).build());
1670 String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
1671 type(shardMrgIDSuffix).build().toString();
1672 TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1673 MessageCollectorActor.props(), shardId);
1675 TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1676 newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1680 shardManager.underlyingActor().waitForRecoveryComplete();
1682 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1683 shardManager.tell(new ActorInitialized(), shard);
1685 waitForShardInitialized(shardManager, "people", this);
1686 waitForShardInitialized(shardManager, "default", this);
1688 // Removed the default shard replica from member-1
1689 shardManager.tell(new ServerRemoved(shardId), getRef());
1691 shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1693 expectMsgClass(duration("5 seconds"), Terminated.class);
1696 LOG.info("testServerRemovedShardActorRunning ending");
1701 public void testShardPersistenceWithRestoredData() throws Exception {
1702 LOG.info("testShardPersistenceWithRestoredData starting");
1703 new JavaTestKit(getSystem()) {{
1704 MockConfiguration mockConfig =
1705 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1706 put("default", Arrays.asList("member-1", "member-2")).
1707 put("astronauts", Arrays.asList("member-2")).
1708 put("people", Arrays.asList("member-1", "member-2")).build());
1709 String[] restoredShards = {"default", "astronauts"};
1710 ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1711 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1713 //create shardManager to come up with restored data
1714 TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1715 newShardMgrProps(mockConfig));
1717 newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1719 newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1720 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1721 assertEquals("for uninitialized shard", "people", notFound.getShardName());
1723 //Verify a local shard is created for the restored shards,
1724 //although we expect a NotInitializedException for the shards as the actor initialization
1725 //message is not sent for them
1726 newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1727 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1729 newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1730 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1733 LOG.info("testShardPersistenceWithRestoredData ending");
1737 private static class TestShardManager extends ShardManager {
1738 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1739 private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1740 private ShardManagerSnapshot snapshot;
1741 private final Map<String, ActorRef> shardActors;
1742 private final ActorRef shardActor;
1743 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1744 private CountDownLatch memberUpReceived = new CountDownLatch(1);
1745 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1746 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1747 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1748 private volatile MessageInterceptor messageInterceptor;
1750 private TestShardManager(Builder builder) {
1752 shardActor = builder.shardActor;
1753 shardActors = builder.shardActors;
1757 public void handleRecover(Object message) throws Exception {
1759 super.handleRecover(message);
1761 if(message instanceof RecoveryCompleted) {
1762 recoveryComplete.countDown();
1768 public void handleCommand(Object message) throws Exception {
1770 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
1771 getSender().tell(messageInterceptor.apply(message), getSelf());
1773 super.handleCommand(message);
1776 if(message instanceof FindPrimary) {
1777 findPrimaryMessageReceived.countDown();
1778 } else if(message instanceof ClusterEvent.MemberUp) {
1779 String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1780 if(!getCluster().getCurrentMemberName().equals(role)) {
1781 memberUpReceived.countDown();
1783 } else if(message instanceof ClusterEvent.MemberRemoved) {
1784 String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1785 if(!getCluster().getCurrentMemberName().equals(role)) {
1786 memberRemovedReceived.countDown();
1788 } else if(message instanceof ClusterEvent.UnreachableMember) {
1789 String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1790 if(!getCluster().getCurrentMemberName().equals(role)) {
1791 memberUnreachableReceived.countDown();
1793 } else if(message instanceof ClusterEvent.ReachableMember) {
1794 String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1795 if(!getCluster().getCurrentMemberName().equals(role)) {
1796 memberReachableReceived.countDown();
1802 void setMessageInterceptor(MessageInterceptor messageInterceptor) {
1803 this.messageInterceptor = messageInterceptor;
1806 void waitForRecoveryComplete() {
1807 assertEquals("Recovery complete", true,
1808 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1811 void waitForMemberUp() {
1812 assertEquals("MemberUp received", true,
1813 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1814 memberUpReceived = new CountDownLatch(1);
1817 void waitForMemberRemoved() {
1818 assertEquals("MemberRemoved received", true,
1819 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1820 memberRemovedReceived = new CountDownLatch(1);
1823 void waitForUnreachableMember() {
1824 assertEquals("UnreachableMember received", true,
1825 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1827 memberUnreachableReceived = new CountDownLatch(1);
1830 void waitForReachableMember() {
1831 assertEquals("ReachableMember received", true,
1832 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1833 memberReachableReceived = new CountDownLatch(1);
1836 void verifyFindPrimary() {
1837 assertEquals("FindPrimary received", true,
1838 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1839 findPrimaryMessageReceived = new CountDownLatch(1);
1842 public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
1843 return new Builder(datastoreContextBuilder);
1846 private static class Builder extends AbstractGenericBuilder<Builder, TestShardManager> {
1847 private ActorRef shardActor;
1848 private final Map<String, ActorRef> shardActors = new HashMap<>();
1850 Builder(DatastoreContext.Builder datastoreContextBuilder) {
1851 super(TestShardManager.class);
1852 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
1855 Builder shardActor(ActorRef shardActor) {
1856 this.shardActor = shardActor;
1860 Builder addShardActor(String shardName, ActorRef actorRef){
1861 shardActors.put(shardName, actorRef);
1867 public void saveSnapshot(Object obj) {
1868 snapshot = (ShardManagerSnapshot) obj;
1869 snapshotPersist.countDown();
1872 void verifySnapshotPersisted(Set<String> shardList) {
1873 assertEquals("saveSnapshot invoked", true,
1874 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
1875 assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
1879 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1880 if(shardActors.get(info.getShardName()) != null){
1881 return shardActors.get(info.getShardName());
1884 if(shardActor != null) {
1888 return super.newShardActor(schemaContext, info);
1892 private static abstract class AbstractGenericBuilder<T extends AbstractGenericBuilder<T, ?>, C extends ShardManager>
1893 extends ShardManager.AbstractBuilder<T> {
1894 private final Class<C> shardManagerClass;
1896 AbstractGenericBuilder(Class<C> shardManagerClass) {
1897 this.shardManagerClass = shardManagerClass;
1898 cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
1899 waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
1903 public Props props() {
1905 return Props.create(shardManagerClass, this);
1909 private static class GenericBuilder<C extends ShardManager> extends AbstractGenericBuilder<GenericBuilder<C>, C> {
1910 GenericBuilder(Class<C> shardManagerClass) {
1911 super(shardManagerClass);
1915 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1916 private static final long serialVersionUID = 1L;
1917 private final Creator<ShardManager> delegate;
1919 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1920 this.delegate = delegate;
1924 public ShardManager create() throws Exception {
1925 return delegate.create();
1929 interface MessageInterceptor extends Function<Object, Object> {
1930 boolean canIntercept(Object message);
1933 private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
1934 return new MessageInterceptor(){
1936 public Object apply(Object message) {
1937 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
1941 public boolean canIntercept(Object message) {
1942 return message instanceof FindPrimary;
1947 private static class MockRespondActor extends MessageCollectorActor {
1948 static final String CLEAR_RESPONSE = "clear-response";
1950 private volatile Object responseMsg;
1952 @SuppressWarnings("unused")
1953 public MockRespondActor() {
1956 @SuppressWarnings("unused")
1957 public MockRespondActor(Object responseMsg) {
1958 this.responseMsg = responseMsg;
1961 public void updateResponse(Object response) {
1962 responseMsg = response;
1966 public void onReceive(Object message) throws Exception {
1967 super.onReceive(message);
1968 if (message instanceof AddServer) {
1969 if (responseMsg != null) {
1970 getSender().tell(responseMsg, getSelf());
1972 } if(message.equals(CLEAR_RESPONSE)) {