2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.actor.Status;
25 import akka.cluster.Cluster;
26 import akka.cluster.ClusterEvent;
27 import akka.dispatch.Dispatchers;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.RecoveryCompleted;
31 import akka.testkit.JavaTestKit;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Optional;
35 import com.google.common.collect.ImmutableMap;
36 import com.google.common.collect.ImmutableSet;
37 import com.google.common.collect.Sets;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import com.typesafe.config.ConfigFactory;
41 import java.util.Arrays;
42 import java.util.List;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import org.junit.After;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.opendaylight.controller.cluster.datastore.config.Configuration;
52 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
53 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
54 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
59 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
60 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
61 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
64 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
65 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
66 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
68 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
69 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
71 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
72 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
78 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
79 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
80 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
83 import org.opendaylight.controller.cluster.raft.RaftState;
84 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
85 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
86 import org.opendaylight.controller.cluster.raft.messages.AddServer;
87 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
88 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
91 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
92 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
93 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
94 import scala.concurrent.Await;
95 import scala.concurrent.Future;
96 import scala.concurrent.duration.FiniteDuration;
98 public class ShardManagerTest extends AbstractActorTest {
99 private static int ID_COUNTER = 1;
101 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
102 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
105 private static CountDownLatch ready;
107 private static TestActorRef<MessageCollectorActor> mockShardActor;
109 private static String mockShardName;
111 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
112 dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
113 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
115 private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
116 String name = new ShardIdentifier(shardName, memberName,"config").toString();
117 return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
120 private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
123 public void setUp() {
124 MockitoAnnotations.initMocks(this);
126 InMemoryJournal.clear();
128 if(mockShardActor == null) {
129 mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
130 mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
133 mockShardActor.underlyingActor().clear();
137 public void tearDown() {
138 InMemoryJournal.clear();
141 private Props newShardMgrProps() {
142 return newShardMgrProps(new MockConfiguration());
145 private Props newShardMgrProps(Configuration config) {
146 return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready,
147 primaryShardInfoCache);
150 private Props newPropsShardMgrWithMockShardActor() {
151 return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
152 new MockConfiguration());
155 private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
156 final ClusterWrapper clusterWrapper, final Configuration config) {
157 Creator<ShardManager> creator = new Creator<ShardManager>() {
158 private static final long serialVersionUID = 1L;
160 public ShardManager create() throws Exception {
161 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
162 ready, name, shardActor, primaryShardInfoCache);
166 return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
170 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
171 new JavaTestKit(getSystem()) {{
172 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
174 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
176 shardManager.tell(new FindPrimary("non-existent", false), getRef());
178 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
183 public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
184 new JavaTestKit(getSystem()) {{
185 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
187 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
189 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
190 shardManager.tell(new ActorInitialized(), mockShardActor);
192 DataTree mockDataTree = mock(DataTree.class);
193 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
194 DataStoreVersions.CURRENT_VERSION), getRef());
196 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
197 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
198 RaftState.Leader.name())), mockShardActor);
200 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
202 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
203 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
204 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
205 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
210 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
211 new JavaTestKit(getSystem()) {{
212 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
214 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
215 shardManager.tell(new ActorInitialized(), mockShardActor);
217 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
218 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
219 shardManager.tell(new RoleChangeNotification(memberId1,
220 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
221 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
223 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
225 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
230 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
231 new JavaTestKit(getSystem()) {{
232 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
234 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
235 shardManager.tell(new ActorInitialized(), mockShardActor);
237 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
238 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
240 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
241 shardManager.tell(new RoleChangeNotification(memberId1,
242 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
243 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
244 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
245 leaderVersion), mockShardActor);
247 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
249 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
250 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
251 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
252 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
257 public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
258 new JavaTestKit(getSystem()) {{
259 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
261 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
263 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
268 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
269 new JavaTestKit(getSystem()) {{
270 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
272 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
273 shardManager.tell(new ActorInitialized(), mockShardActor);
275 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
277 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
282 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
283 new JavaTestKit(getSystem()) {{
284 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
286 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
287 shardManager.tell(new ActorInitialized(), mockShardActor);
289 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
290 shardManager.tell(new RoleChangeNotification(memberId,
291 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
293 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
295 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
297 DataTree mockDataTree = mock(DataTree.class);
298 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
299 DataStoreVersions.CURRENT_VERSION), mockShardActor);
301 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
303 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
304 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
305 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
306 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
311 public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
312 new JavaTestKit(getSystem()) {{
313 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
315 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
317 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
318 // delayed until we send ActorInitialized and RoleChangeNotification.
319 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
321 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
323 shardManager.tell(new ActorInitialized(), mockShardActor);
325 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
327 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
328 shardManager.tell(new RoleChangeNotification(memberId,
329 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
331 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
333 DataTree mockDataTree = mock(DataTree.class);
334 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
335 DataStoreVersions.CURRENT_VERSION), mockShardActor);
337 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
338 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
339 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
340 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
342 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
347 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
348 new JavaTestKit(getSystem()) {{
349 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
351 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
353 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
355 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
357 shardManager.tell(new ActorInitialized(), mockShardActor);
359 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
364 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
365 new JavaTestKit(getSystem()) {{
366 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
368 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
369 shardManager.tell(new ActorInitialized(), mockShardActor);
370 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
371 null, RaftState.Candidate.name()), mockShardActor);
373 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
375 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
380 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
381 new JavaTestKit(getSystem()) {{
382 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
384 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
385 shardManager.tell(new ActorInitialized(), mockShardActor);
386 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
387 null, RaftState.IsolatedLeader.name()), mockShardActor);
389 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
391 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
396 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
397 new JavaTestKit(getSystem()) {{
398 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
400 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
401 shardManager.tell(new ActorInitialized(), mockShardActor);
403 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
405 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
410 public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
411 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
413 // Create an ActorSystem ShardManager actor for member-1.
415 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
416 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
418 ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
420 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
421 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
422 new MockConfiguration()), shardManagerID);
424 // Create an ActorSystem ShardManager actor for member-2.
426 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
428 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
430 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
432 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
433 put("default", Arrays.asList("member-1", "member-2")).
434 put("astronauts", Arrays.asList("member-2")).build());
436 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
437 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
438 mockConfig2), shardManagerID);
440 new JavaTestKit(system1) {{
442 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
443 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
445 shardManager2.tell(new ActorInitialized(), mockShardActor2);
447 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
448 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
449 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
450 Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
451 shardManager2.tell(new RoleChangeNotification(memberId2,
452 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
454 shardManager1.underlyingActor().waitForMemberUp();
456 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
458 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
459 String path = found.getPrimaryPath();
460 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
461 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
463 shardManager2.underlyingActor().verifyFindPrimary();
465 Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
467 shardManager1.underlyingActor().waitForMemberRemoved();
469 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
471 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
474 JavaTestKit.shutdownActorSystem(system1);
475 JavaTestKit.shutdownActorSystem(system2);
479 public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
480 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
482 // Create an ActorSystem ShardManager actor for member-1.
484 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
485 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
487 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
489 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
490 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
491 new MockConfiguration()), shardManagerID);
493 // Create an ActorSystem ShardManager actor for member-2.
495 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
497 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
499 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
501 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
502 put("default", Arrays.asList("member-1", "member-2")).build());
504 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
505 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
506 mockConfig2), shardManagerID);
508 new JavaTestKit(system1) {{
510 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
511 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
512 shardManager1.tell(new ActorInitialized(), mockShardActor1);
513 shardManager2.tell(new ActorInitialized(), mockShardActor2);
515 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
516 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
517 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
518 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
519 shardManager1.tell(new RoleChangeNotification(memberId1,
520 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
521 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
522 DataStoreVersions.CURRENT_VERSION),
524 shardManager2.tell(new RoleChangeNotification(memberId2,
525 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
526 shardManager1.underlyingActor().waitForMemberUp();
528 shardManager1.tell(new FindPrimary("default", true), getRef());
530 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
531 String path = found.getPrimaryPath();
532 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
534 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
535 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
537 shardManager1.underlyingActor().waitForUnreachableMember();
539 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
540 assertEquals("getMemberName", "member-2", peerDown.getMemberName());
541 MessageCollectorActor.clearMessages(mockShardActor1);
543 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
544 createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
546 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
548 shardManager1.tell(new FindPrimary("default", true), getRef());
550 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
552 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
553 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
555 shardManager1.underlyingActor().waitForReachableMember();
557 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
558 assertEquals("getMemberName", "member-2", peerUp.getMemberName());
559 MessageCollectorActor.clearMessages(mockShardActor1);
561 shardManager1.tell(new FindPrimary("default", true), getRef());
563 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
564 String path1 = found1.getPrimaryPath();
565 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
567 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
568 createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
570 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
574 JavaTestKit.shutdownActorSystem(system1);
575 JavaTestKit.shutdownActorSystem(system2);
579 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
580 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
582 // Create an ActorSystem ShardManager actor for member-1.
584 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
585 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
587 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
589 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
590 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
591 new MockConfiguration()), shardManagerID);
593 // Create an ActorSystem ShardManager actor for member-2.
595 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
597 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
599 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
601 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
602 put("default", Arrays.asList("member-1", "member-2")).build());
604 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
605 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
606 mockConfig2), shardManagerID);
608 new JavaTestKit(system1) {{
610 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
611 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
612 shardManager1.tell(new ActorInitialized(), mockShardActor1);
613 shardManager2.tell(new ActorInitialized(), mockShardActor2);
615 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
616 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
617 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
618 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
619 shardManager1.tell(new RoleChangeNotification(memberId1,
620 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
621 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
622 DataStoreVersions.CURRENT_VERSION),
624 shardManager2.tell(new RoleChangeNotification(memberId2,
625 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
626 shardManager1.underlyingActor().waitForMemberUp();
628 shardManager1.tell(new FindPrimary("default", true), getRef());
630 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
631 String path = found.getPrimaryPath();
632 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
634 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
635 mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
637 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
638 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
640 shardManager1.underlyingActor().waitForUnreachableMember();
642 shardManager1.tell(new FindPrimary("default", true), getRef());
644 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
646 assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
648 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
649 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
650 shardManager1.tell(new RoleChangeNotification(memberId1,
651 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
653 shardManager1.tell(new FindPrimary("default", true), getRef());
655 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
656 String path1 = found1.getPrimaryPath();
657 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
661 JavaTestKit.shutdownActorSystem(system1);
662 JavaTestKit.shutdownActorSystem(system2);
667 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
668 new JavaTestKit(getSystem()) {{
669 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
671 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
673 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
675 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
677 assertEquals("getShardName", "non-existent", notFound.getShardName());
682 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
683 new JavaTestKit(getSystem()) {{
684 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
686 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
687 shardManager.tell(new ActorInitialized(), mockShardActor);
689 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
691 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
693 assertTrue("Found path contains " + found.getPath().path().toString(),
694 found.getPath().path().toString().contains("member-1-shard-default-config"));
699 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
700 new JavaTestKit(getSystem()) {{
701 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
703 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
705 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
710 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
711 new JavaTestKit(getSystem()) {{
712 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
714 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
716 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
717 // delayed until we send ActorInitialized.
718 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
719 new Timeout(5, TimeUnit.SECONDS));
721 shardManager.tell(new ActorInitialized(), mockShardActor);
723 Object resp = Await.result(future, duration("5 seconds"));
724 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
729 public void testOnRecoveryJournalIsCleaned() {
730 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
731 ImmutableSet.of("foo")));
732 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
733 ImmutableSet.of("bar")));
734 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
736 new JavaTestKit(getSystem()) {{
737 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
738 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
740 shardManager.underlyingActor().waitForRecoveryComplete();
741 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
743 // Journal entries up to the last one should've been deleted
744 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
745 synchronized (journal) {
746 assertEquals("Journal size", 0, journal.size());
752 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
753 new JavaTestKit(getSystem()) {
755 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
757 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
758 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
759 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
761 verify(ready, never()).countDown();
763 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
764 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
766 verify(ready, times(1)).countDown();
772 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
773 new JavaTestKit(getSystem()) {
775 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
777 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
778 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
779 memberId, null, RaftState.Follower.name()));
781 verify(ready, never()).countDown();
783 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
785 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
786 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
787 DataStoreVersions.CURRENT_VERSION));
789 verify(ready, times(1)).countDown();
795 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
796 new JavaTestKit(getSystem()) {
798 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
800 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
801 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
802 memberId, null, RaftState.Follower.name()));
804 verify(ready, never()).countDown();
806 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
807 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
808 DataStoreVersions.CURRENT_VERSION));
810 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
812 verify(ready, times(1)).countDown();
818 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
819 new JavaTestKit(getSystem()) {
821 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
823 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
824 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
826 verify(ready, never()).countDown();
833 public void testByDefaultSyncStatusIsFalse() throws Exception{
834 final Props persistentProps = newShardMgrProps();
835 final TestActorRef<ShardManager> shardManager =
836 TestActorRef.create(getSystem(), persistentProps);
838 ShardManager shardManagerActor = shardManager.underlyingActor();
840 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
844 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
845 final Props persistentProps = ShardManager.props(
846 new MockClusterWrapper(),
847 new MockConfiguration(),
848 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
849 final TestActorRef<ShardManager> shardManager =
850 TestActorRef.create(getSystem(), persistentProps);
852 ShardManager shardManagerActor = shardManager.underlyingActor();
853 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
854 RaftState.Follower.name(), RaftState.Leader.name()));
856 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
860 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
861 final Props persistentProps = newShardMgrProps();
862 final TestActorRef<ShardManager> shardManager =
863 TestActorRef.create(getSystem(), persistentProps);
865 ShardManager shardManagerActor = shardManager.underlyingActor();
866 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
867 RaftState.Follower.name(), RaftState.Candidate.name()));
869 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
871 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
872 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
874 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
878 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
879 final Props persistentProps = ShardManager.props(
880 new MockClusterWrapper(),
881 new MockConfiguration(),
882 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
883 final TestActorRef<ShardManager> shardManager =
884 TestActorRef.create(getSystem(), persistentProps);
886 ShardManager shardManagerActor = shardManager.underlyingActor();
887 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
888 RaftState.Candidate.name(), RaftState.Follower.name()));
890 // Initially will be false
891 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
893 // Send status true will make sync status true
894 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
896 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
898 // Send status false will make sync status false
899 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
901 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
906 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
907 final Props persistentProps = ShardManager.props(
908 new MockClusterWrapper(),
909 new MockConfiguration() {
911 public List<String> getMemberShardNames(String memberName) {
912 return Arrays.asList("default", "astronauts");
915 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
916 final TestActorRef<ShardManager> shardManager =
917 TestActorRef.create(getSystem(), persistentProps);
919 ShardManager shardManagerActor = shardManager.underlyingActor();
921 // Initially will be false
922 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
924 // Make default shard leader
925 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
926 RaftState.Follower.name(), RaftState.Leader.name()));
928 // default = Leader, astronauts is unknown so sync status remains false
929 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
931 // Make astronauts shard leader as well
932 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
933 RaftState.Follower.name(), RaftState.Leader.name()));
935 // Now sync status should be true
936 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
938 // Make astronauts a Follower
939 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
940 RaftState.Leader.name(), RaftState.Follower.name()));
942 // Sync status is not true
943 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
945 // Make the astronauts follower sync status true
946 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
948 // Sync status is now true
949 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
954 public void testOnReceiveSwitchShardBehavior() throws Exception {
955 new JavaTestKit(getSystem()) {{
956 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
958 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
959 shardManager.tell(new ActorInitialized(), mockShardActor);
961 shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
963 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
965 assertEquals(RaftState.Leader, switchBehavior.getNewState());
966 assertEquals(1000, switchBehavior.getNewTerm());
971 public void testOnReceiveCreateShard() {
972 new JavaTestKit(getSystem()) {{
973 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
975 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
976 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
978 SchemaContext schemaContext = TestModel.createTestContext();
979 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
981 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
982 persistent(false).build();
983 TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
985 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
986 "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
987 shardManager.tell(new CreateShard(config, shardPropsCreator, datastoreContext), getRef());
989 expectMsgClass(duration("5 seconds"), CreateShardReply.class);
991 shardManager.tell(new FindLocalShard("foo", true), getRef());
993 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
995 assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
996 assertTrue("Epxected ShardPeerAddressResolver", shardPropsCreator.datastoreContext.getShardRaftConfig().
997 getPeerAddressResolver() instanceof ShardPeerAddressResolver);
998 assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
999 new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1000 shardPropsCreator.peerAddresses.keySet());
1001 assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1002 shardPropsCreator.shardId);
1003 assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
1005 // Send CreateShard with same name - should fail.
1007 shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
1009 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1014 public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
1015 new JavaTestKit(getSystem()) {{
1016 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1017 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1019 TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
1021 ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1022 "foo", null, Arrays.asList("member-1"));
1023 shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
1025 expectMsgClass(duration("5 seconds"), CreateShardReply.class);
1027 SchemaContext schemaContext = TestModel.createTestContext();
1028 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1030 shardManager.tell(new FindLocalShard("foo", true), getRef());
1032 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1034 assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
1035 assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
1040 public void testAddShardReplicaForNonExistentShard() throws Exception {
1041 new JavaTestKit(getSystem()) {{
1042 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1043 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1045 shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1046 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1048 assertEquals("Failure obtained", true,
1049 (resp.cause() instanceof IllegalArgumentException));
1054 public void testAddShardReplicaForAlreadyCreatedShard() throws Exception {
1055 new JavaTestKit(getSystem()) {{
1056 ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
1057 shardManager.tell(new AddShardReplica("default"), getRef());
1058 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1059 assertEquals("Failure obtained", true,
1060 (resp.cause() instanceof IllegalArgumentException));
1065 public void testAddShardReplica() throws Exception {
1066 MockConfiguration mockConfig =
1067 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1068 put("default", Arrays.asList("member-1", "member-2")).
1069 put("astronauts", Arrays.asList("member-2")).build());
1071 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1073 // Create an ActorSystem ShardManager actor for member-1.
1074 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1075 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1076 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1077 final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1078 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1079 new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1081 // Create an ActorSystem ShardManager actor for member-2.
1082 final ActorSystem system2 = ActorSystem.create("cluster-test",
1083 ConfigFactory.load().getConfig("Member2"));
1084 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1086 String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1087 final TestActorRef<MockRespondActor> mockShardLeaderActor =
1088 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1089 final TestActorRef<ForwardingShardManager> leaderShardManager = TestActorRef.create(system2,
1090 newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor,
1091 new ClusterWrapperImpl(system2), mockConfig), shardManagerID);
1093 new JavaTestKit(system1) {{
1095 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1096 leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1098 leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1100 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1101 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1102 leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1103 Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1104 leaderShardManager.tell(new RoleChangeNotification(memberId2,
1105 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1107 newReplicaShardManager.underlyingActor().waitForMemberUp();
1108 leaderShardManager.underlyingActor().waitForMemberUp();
1110 //construct a mock response message
1111 AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1112 mockShardLeaderActor.underlyingActor().updateResponse(response);
1113 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1114 AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1116 String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1117 assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1119 expectMsgClass(duration("5 seconds"), Status.Success.class);
1122 JavaTestKit.shutdownActorSystem(system1);
1123 JavaTestKit.shutdownActorSystem(system2);
1127 public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1128 MockConfiguration mockConfig =
1129 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1130 put("default", Arrays.asList("member-1", "member-2")).
1131 put("astronauts", Arrays.asList("member-2")).build());
1133 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1135 // Create an ActorSystem ShardManager actor for member-1.
1136 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1137 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1138 ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1139 final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1140 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1141 new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1143 new JavaTestKit(system1) {{
1145 newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1146 MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1147 newReplicaShardManager.underlyingActor().waitForMemberUp();
1149 newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1150 Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1151 assertEquals("Failure obtained", true,
1152 (resp.cause() instanceof RuntimeException));
1155 JavaTestKit.shutdownActorSystem(system1);
1159 public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1160 new JavaTestKit(getSystem()) {{
1161 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1162 new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1164 shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
1165 Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1166 assertEquals("Failure obtained", true,
1167 (resp.cause() instanceof IllegalArgumentException));
1172 private static class TestShardPropsCreator implements ShardPropsCreator {
1173 ShardIdentifier shardId;
1174 Map<String, String> peerAddresses;
1175 SchemaContext schemaContext;
1176 DatastoreContext datastoreContext;
1179 public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
1180 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1181 this.shardId = shardId;
1182 this.peerAddresses = peerAddresses;
1183 this.schemaContext = schemaContext;
1184 this.datastoreContext = datastoreContext;
1185 return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1190 private static class TestShardManager extends ShardManager {
1191 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1193 TestShardManager(String shardMrgIDSuffix) {
1194 super(new MockClusterWrapper(), new MockConfiguration(),
1195 DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1196 new PrimaryShardInfoFutureCache());
1200 public void handleRecover(Object message) throws Exception {
1202 super.handleRecover(message);
1204 if(message instanceof RecoveryCompleted) {
1205 recoveryComplete.countDown();
1210 void waitForRecoveryComplete() {
1211 assertEquals("Recovery complete", true,
1212 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1216 @SuppressWarnings("serial")
1217 static class TestShardManagerCreator implements Creator<TestShardManager> {
1218 String shardMrgIDSuffix;
1220 TestShardManagerCreator(String shardMrgIDSuffix) {
1221 this.shardMrgIDSuffix = shardMrgIDSuffix;
1225 public TestShardManager create() throws Exception {
1226 return new TestShardManager(shardMrgIDSuffix);
1231 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1232 private static final long serialVersionUID = 1L;
1233 private final Creator<ShardManager> delegate;
1235 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1236 this.delegate = delegate;
1240 public ShardManager create() throws Exception {
1241 return delegate.create();
1245 private static class ForwardingShardManager extends ShardManager {
1246 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1247 private CountDownLatch memberUpReceived = new CountDownLatch(1);
1248 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1249 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1250 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1251 private final ActorRef shardActor;
1252 private final String name;
1254 protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1255 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1256 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1257 super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1258 this.shardActor = shardActor;
1263 public void handleCommand(Object message) throws Exception {
1265 super.handleCommand(message);
1267 if(message instanceof FindPrimary) {
1268 findPrimaryMessageReceived.countDown();
1269 } else if(message instanceof ClusterEvent.MemberUp) {
1270 String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1271 if(!getCluster().getCurrentMemberName().equals(role)) {
1272 memberUpReceived.countDown();
1274 } else if(message instanceof ClusterEvent.MemberRemoved) {
1275 String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1276 if(!getCluster().getCurrentMemberName().equals(role)) {
1277 memberRemovedReceived.countDown();
1279 } else if(message instanceof ClusterEvent.UnreachableMember) {
1280 String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1281 if(!getCluster().getCurrentMemberName().equals(role)) {
1282 memberUnreachableReceived.countDown();
1284 } else if(message instanceof ClusterEvent.ReachableMember) {
1285 String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1286 if(!getCluster().getCurrentMemberName().equals(role)) {
1287 memberReachableReceived.countDown();
1294 public String persistenceId() {
1299 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1303 void waitForMemberUp() {
1304 assertEquals("MemberUp received", true,
1305 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1306 memberUpReceived = new CountDownLatch(1);
1309 void waitForMemberRemoved() {
1310 assertEquals("MemberRemoved received", true,
1311 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1312 memberRemovedReceived = new CountDownLatch(1);
1315 void waitForUnreachableMember() {
1316 assertEquals("UnreachableMember received", true,
1317 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1319 memberUnreachableReceived = new CountDownLatch(1);
1322 void waitForReachableMember() {
1323 assertEquals("ReachableMember received", true,
1324 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1325 memberReachableReceived = new CountDownLatch(1);
1328 void verifyFindPrimary() {
1329 assertEquals("FindPrimary received", true,
1330 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1331 findPrimaryMessageReceived = new CountDownLatch(1);
1335 private static class MockRespondActor extends MessageCollectorActor {
1337 private Object responseMsg;
1339 public void updateResponse(Object response) {
1340 responseMsg = response;
1344 public void onReceive(Object message) throws Exception {
1345 super.onReceive(message);
1346 if (message instanceof AddServer) {
1347 if (responseMsg != null) {
1348 getSender().tell(responseMsg, getSelf());