1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertNull;
6 import static org.junit.Assert.assertSame;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.mock;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.AddressFromURIString;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent;
18 import akka.dispatch.Dispatchers;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.persistence.RecoveryCompleted;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.ImmutableSet;
28 import com.google.common.collect.Sets;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.mockito.Mock;
41 import org.mockito.MockitoAnnotations;
42 import org.opendaylight.controller.cluster.datastore.config.Configuration;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
59 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
60 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
61 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
62 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
63 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
64 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
65 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
66 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
67 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
68 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
69 import org.opendaylight.controller.cluster.raft.RaftState;
70 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
71 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
74 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
76 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.duration.FiniteDuration;
81 public class ShardManagerTest extends AbstractActorTest {
82 private static int ID_COUNTER = 1;
84 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
85 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
88 private static CountDownLatch ready;
90 private static TestActorRef<MessageCollectorActor> mockShardActor;
92 private static String mockShardName;
94 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
95 dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
96 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
98 private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
99 String name = new ShardIdentifier(shardName, memberName,"config").toString();
100 return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
103 private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
106 public void setUp() {
107 MockitoAnnotations.initMocks(this);
109 InMemoryJournal.clear();
111 if(mockShardActor == null) {
112 mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
113 mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
116 mockShardActor.underlyingActor().clear();
120 public void tearDown() {
121 InMemoryJournal.clear();
124 private Props newShardMgrProps(boolean persistent) {
125 return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
126 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
129 private Props newPropsShardMgrWithMockShardActor() {
130 return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
131 new MockConfiguration());
134 private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
135 final ClusterWrapper clusterWrapper, final Configuration config) {
136 Creator<ShardManager> creator = new Creator<ShardManager>() {
137 private static final long serialVersionUID = 1L;
139 public ShardManager create() throws Exception {
140 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
141 ready, name, shardActor, primaryShardInfoCache);
145 return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
149 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
150 new JavaTestKit(getSystem()) {{
151 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
153 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
155 shardManager.tell(new FindPrimary("non-existent", false), getRef());
157 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
162 public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
163 new JavaTestKit(getSystem()) {{
164 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
166 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
168 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
169 shardManager.tell(new ActorInitialized(), mockShardActor);
171 DataTree mockDataTree = mock(DataTree.class);
172 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
173 DataStoreVersions.CURRENT_VERSION), getRef());
175 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
176 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
177 RaftState.Leader.name())), mockShardActor);
179 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
181 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
182 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
183 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
184 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
189 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
190 new JavaTestKit(getSystem()) {{
191 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
193 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
194 shardManager.tell(new ActorInitialized(), mockShardActor);
196 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
197 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
198 shardManager.tell(new RoleChangeNotification(memberId1,
199 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
200 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
202 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
204 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
209 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
210 new JavaTestKit(getSystem()) {{
211 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
213 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
214 shardManager.tell(new ActorInitialized(), mockShardActor);
216 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
217 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
219 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
220 shardManager.tell(new RoleChangeNotification(memberId1,
221 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
222 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
223 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
224 leaderVersion), mockShardActor);
226 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
228 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
229 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
230 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
231 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
236 public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
237 new JavaTestKit(getSystem()) {{
238 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
240 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
242 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
247 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
248 new JavaTestKit(getSystem()) {{
249 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
251 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
252 shardManager.tell(new ActorInitialized(), mockShardActor);
254 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
256 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
261 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
262 new JavaTestKit(getSystem()) {{
263 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
265 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
266 shardManager.tell(new ActorInitialized(), mockShardActor);
268 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
269 shardManager.tell(new RoleChangeNotification(memberId,
270 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
272 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
274 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
276 DataTree mockDataTree = mock(DataTree.class);
277 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
278 DataStoreVersions.CURRENT_VERSION), mockShardActor);
280 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
282 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
283 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
284 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
285 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
290 public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
291 new JavaTestKit(getSystem()) {{
292 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
294 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
296 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
297 // delayed until we send ActorInitialized and RoleChangeNotification.
298 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
300 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
302 shardManager.tell(new ActorInitialized(), mockShardActor);
304 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
306 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
307 shardManager.tell(new RoleChangeNotification(memberId,
308 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
310 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
312 DataTree mockDataTree = mock(DataTree.class);
313 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
314 DataStoreVersions.CURRENT_VERSION), mockShardActor);
316 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
317 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
318 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
319 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
321 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
326 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
327 new JavaTestKit(getSystem()) {{
328 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
330 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
332 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
334 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
336 shardManager.tell(new ActorInitialized(), mockShardActor);
338 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
343 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
344 new JavaTestKit(getSystem()) {{
345 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
347 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
348 shardManager.tell(new ActorInitialized(), mockShardActor);
349 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
350 null, RaftState.Candidate.name()), mockShardActor);
352 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
354 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
359 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
360 new JavaTestKit(getSystem()) {{
361 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
363 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
364 shardManager.tell(new ActorInitialized(), mockShardActor);
365 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
366 null, RaftState.IsolatedLeader.name()), mockShardActor);
368 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
370 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
375 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
376 new JavaTestKit(getSystem()) {{
377 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
379 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
380 shardManager.tell(new ActorInitialized(), mockShardActor);
382 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
384 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
389 public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
390 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
392 // Create an ActorSystem ShardManager actor for member-1.
394 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
395 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
397 ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
399 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
400 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
401 new MockConfiguration()), shardManagerID);
403 // Create an ActorSystem ShardManager actor for member-2.
405 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
407 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
409 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
411 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
412 put("default", Arrays.asList("member-1", "member-2")).
413 put("astronauts", Arrays.asList("member-2")).build());
415 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
416 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
417 mockConfig2), shardManagerID);
419 new JavaTestKit(system1) {{
421 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
422 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
424 shardManager2.tell(new ActorInitialized(), mockShardActor2);
426 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
427 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
428 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
429 Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
430 shardManager2.tell(new RoleChangeNotification(memberId2,
431 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
433 shardManager1.underlyingActor().waitForMemberUp();
435 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
437 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
438 String path = found.getPrimaryPath();
439 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
440 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
442 shardManager2.underlyingActor().verifyFindPrimary();
444 Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
446 shardManager1.underlyingActor().waitForMemberRemoved();
448 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
450 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
453 JavaTestKit.shutdownActorSystem(system1);
454 JavaTestKit.shutdownActorSystem(system2);
458 public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
459 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
461 // Create an ActorSystem ShardManager actor for member-1.
463 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
464 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
466 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
468 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
469 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
470 new MockConfiguration()), shardManagerID);
472 // Create an ActorSystem ShardManager actor for member-2.
474 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
476 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
478 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
480 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
481 put("default", Arrays.asList("member-1", "member-2")).build());
483 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
484 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
485 mockConfig2), shardManagerID);
487 new JavaTestKit(system1) {{
489 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
490 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
491 shardManager1.tell(new ActorInitialized(), mockShardActor1);
492 shardManager2.tell(new ActorInitialized(), mockShardActor2);
494 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
495 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
496 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
497 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
498 shardManager1.tell(new RoleChangeNotification(memberId1,
499 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
500 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
501 DataStoreVersions.CURRENT_VERSION),
503 shardManager2.tell(new RoleChangeNotification(memberId2,
504 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
505 shardManager1.underlyingActor().waitForMemberUp();
507 shardManager1.tell(new FindPrimary("default", true), getRef());
509 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
510 String path = found.getPrimaryPath();
511 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
513 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
514 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
516 shardManager1.underlyingActor().waitForUnreachableMember();
518 PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
519 assertEquals("getMemberName", "member-2", peerDown.getMemberName());
520 MessageCollectorActor.clearMessages(mockShardActor1);
522 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
523 createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
525 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
527 shardManager1.tell(new FindPrimary("default", true), getRef());
529 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
531 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
532 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
534 shardManager1.underlyingActor().waitForReachableMember();
536 PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
537 assertEquals("getMemberName", "member-2", peerUp.getMemberName());
538 MessageCollectorActor.clearMessages(mockShardActor1);
540 shardManager1.tell(new FindPrimary("default", true), getRef());
542 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
543 String path1 = found1.getPrimaryPath();
544 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
546 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
547 createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
549 MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
553 JavaTestKit.shutdownActorSystem(system1);
554 JavaTestKit.shutdownActorSystem(system2);
558 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
559 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
561 // Create an ActorSystem ShardManager actor for member-1.
563 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
564 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
566 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
568 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
569 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
570 new MockConfiguration()), shardManagerID);
572 // Create an ActorSystem ShardManager actor for member-2.
574 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
576 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
578 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
580 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
581 put("default", Arrays.asList("member-1", "member-2")).build());
583 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
584 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
585 mockConfig2), shardManagerID);
587 new JavaTestKit(system1) {{
589 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
590 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
591 shardManager1.tell(new ActorInitialized(), mockShardActor1);
592 shardManager2.tell(new ActorInitialized(), mockShardActor2);
594 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
595 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
596 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
597 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
598 shardManager1.tell(new RoleChangeNotification(memberId1,
599 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
600 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
601 DataStoreVersions.CURRENT_VERSION),
603 shardManager2.tell(new RoleChangeNotification(memberId2,
604 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
605 shardManager1.underlyingActor().waitForMemberUp();
607 shardManager1.tell(new FindPrimary("default", true), getRef());
609 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
610 String path = found.getPrimaryPath();
611 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
613 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
614 mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
616 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
617 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
619 shardManager1.underlyingActor().waitForUnreachableMember();
621 shardManager1.tell(new FindPrimary("default", true), getRef());
623 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
625 assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
627 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
628 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
629 shardManager1.tell(new RoleChangeNotification(memberId1,
630 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
632 shardManager1.tell(new FindPrimary("default", true), getRef());
634 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
635 String path1 = found1.getPrimaryPath();
636 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
640 JavaTestKit.shutdownActorSystem(system1);
641 JavaTestKit.shutdownActorSystem(system2);
646 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
647 new JavaTestKit(getSystem()) {{
648 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
650 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
652 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
654 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
656 assertEquals("getShardName", "non-existent", notFound.getShardName());
661 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
662 new JavaTestKit(getSystem()) {{
663 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
665 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
666 shardManager.tell(new ActorInitialized(), mockShardActor);
668 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
670 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
672 assertTrue("Found path contains " + found.getPath().path().toString(),
673 found.getPath().path().toString().contains("member-1-shard-default-config"));
678 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
679 new JavaTestKit(getSystem()) {{
680 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
682 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
684 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
689 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
690 new JavaTestKit(getSystem()) {{
691 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
693 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
695 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
696 // delayed until we send ActorInitialized.
697 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
698 new Timeout(5, TimeUnit.SECONDS));
700 shardManager.tell(new ActorInitialized(), mockShardActor);
702 Object resp = Await.result(future, duration("5 seconds"));
703 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
708 public void testOnRecoveryJournalIsCleaned() {
709 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
710 ImmutableSet.of("foo")));
711 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
712 ImmutableSet.of("bar")));
713 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
715 new JavaTestKit(getSystem()) {{
716 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
717 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
719 shardManager.underlyingActor().waitForRecoveryComplete();
720 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
722 // Journal entries up to the last one should've been deleted
723 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
724 synchronized (journal) {
725 assertEquals("Journal size", 0, journal.size());
731 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
732 new JavaTestKit(getSystem()) {
734 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
736 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
737 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
738 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
740 verify(ready, never()).countDown();
742 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
743 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
745 verify(ready, times(1)).countDown();
751 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
752 new JavaTestKit(getSystem()) {
754 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
756 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
757 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
758 memberId, null, RaftState.Follower.name()));
760 verify(ready, never()).countDown();
762 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
764 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
765 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
766 DataStoreVersions.CURRENT_VERSION));
768 verify(ready, times(1)).countDown();
774 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
775 new JavaTestKit(getSystem()) {
777 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
779 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
780 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
781 memberId, null, RaftState.Follower.name()));
783 verify(ready, never()).countDown();
785 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
786 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
787 DataStoreVersions.CURRENT_VERSION));
789 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
791 verify(ready, times(1)).countDown();
797 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
798 new JavaTestKit(getSystem()) {
800 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
802 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
803 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
805 verify(ready, never()).countDown();
812 public void testByDefaultSyncStatusIsFalse() throws Exception{
813 final Props persistentProps = newShardMgrProps(true);
814 final TestActorRef<ShardManager> shardManager =
815 TestActorRef.create(getSystem(), persistentProps);
817 ShardManager shardManagerActor = shardManager.underlyingActor();
819 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
823 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
824 final Props persistentProps = ShardManager.props(
825 new MockClusterWrapper(),
826 new MockConfiguration(),
827 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
828 final TestActorRef<ShardManager> shardManager =
829 TestActorRef.create(getSystem(), persistentProps);
831 ShardManager shardManagerActor = shardManager.underlyingActor();
832 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
833 RaftState.Follower.name(), RaftState.Leader.name()));
835 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
839 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
840 final Props persistentProps = newShardMgrProps(true);
841 final TestActorRef<ShardManager> shardManager =
842 TestActorRef.create(getSystem(), persistentProps);
844 ShardManager shardManagerActor = shardManager.underlyingActor();
845 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
846 RaftState.Follower.name(), RaftState.Candidate.name()));
848 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
850 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
851 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
853 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
857 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
858 final Props persistentProps = ShardManager.props(
859 new MockClusterWrapper(),
860 new MockConfiguration(),
861 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
862 final TestActorRef<ShardManager> shardManager =
863 TestActorRef.create(getSystem(), persistentProps);
865 ShardManager shardManagerActor = shardManager.underlyingActor();
866 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
867 RaftState.Candidate.name(), RaftState.Follower.name()));
869 // Initially will be false
870 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
872 // Send status true will make sync status true
873 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
875 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
877 // Send status false will make sync status false
878 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
880 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
885 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
886 final Props persistentProps = ShardManager.props(
887 new MockClusterWrapper(),
888 new MockConfiguration() {
890 public List<String> getMemberShardNames(String memberName) {
891 return Arrays.asList("default", "astronauts");
894 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
895 final TestActorRef<ShardManager> shardManager =
896 TestActorRef.create(getSystem(), persistentProps);
898 ShardManager shardManagerActor = shardManager.underlyingActor();
900 // Initially will be false
901 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
903 // Make default shard leader
904 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
905 RaftState.Follower.name(), RaftState.Leader.name()));
907 // default = Leader, astronauts is unknown so sync status remains false
908 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
910 // Make astronauts shard leader as well
911 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
912 RaftState.Follower.name(), RaftState.Leader.name()));
914 // Now sync status should be true
915 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
917 // Make astronauts a Follower
918 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
919 RaftState.Leader.name(), RaftState.Follower.name()));
921 // Sync status is not true
922 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
924 // Make the astronauts follower sync status true
925 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
927 // Sync status is now true
928 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
933 public void testOnReceiveSwitchShardBehavior() throws Exception {
934 new JavaTestKit(getSystem()) {{
935 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
937 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
938 shardManager.tell(new ActorInitialized(), mockShardActor);
940 shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
942 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
944 assertEquals(RaftState.Leader, switchBehavior.getNewState());
945 assertEquals(1000, switchBehavior.getNewTerm());
949 public void testOnReceiveCreateShard() {
950 new JavaTestKit(getSystem()) {{
951 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
953 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
955 SchemaContext schemaContext = TestModel.createTestContext();
956 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
958 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
959 persistent(false).build();
960 TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
962 shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator,
963 datastoreContext), getRef());
965 expectMsgClass(duration("5 seconds"), CreateShardReply.class);
967 shardManager.tell(new FindLocalShard("foo", true), getRef());
969 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
971 assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
972 assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
973 new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
974 shardPropsCreator.peerAddresses.keySet());
975 assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
976 shardPropsCreator.shardId);
977 assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
979 // Send CreateShard with same name - should fail.
981 shardManager.tell(new CreateShard("foo", Collections.<String>emptyList(), shardPropsCreator, null), getRef());
983 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
988 public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
989 new JavaTestKit(getSystem()) {{
990 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
992 TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
994 shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef());
996 expectMsgClass(duration("5 seconds"), CreateShardReply.class);
998 SchemaContext schemaContext = TestModel.createTestContext();
999 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1001 shardManager.tell(new FindLocalShard("foo", true), getRef());
1003 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1005 assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
1006 assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
1010 private static class TestShardPropsCreator implements ShardPropsCreator {
1011 ShardIdentifier shardId;
1012 Map<String, String> peerAddresses;
1013 SchemaContext schemaContext;
1014 DatastoreContext datastoreContext;
1017 public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
1018 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1019 this.shardId = shardId;
1020 this.peerAddresses = peerAddresses;
1021 this.schemaContext = schemaContext;
1022 this.datastoreContext = datastoreContext;
1023 return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1028 private static class TestShardManager extends ShardManager {
1029 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1031 TestShardManager(String shardMrgIDSuffix) {
1032 super(new MockClusterWrapper(), new MockConfiguration(),
1033 DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1034 new PrimaryShardInfoFutureCache());
1038 public void handleRecover(Object message) throws Exception {
1040 super.handleRecover(message);
1042 if(message instanceof RecoveryCompleted) {
1043 recoveryComplete.countDown();
1048 void waitForRecoveryComplete() {
1049 assertEquals("Recovery complete", true,
1050 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1054 @SuppressWarnings("serial")
1055 static class TestShardManagerCreator implements Creator<TestShardManager> {
1056 String shardMrgIDSuffix;
1058 TestShardManagerCreator(String shardMrgIDSuffix) {
1059 this.shardMrgIDSuffix = shardMrgIDSuffix;
1063 public TestShardManager create() throws Exception {
1064 return new TestShardManager(shardMrgIDSuffix);
1069 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1070 private static final long serialVersionUID = 1L;
1071 private final Creator<ShardManager> delegate;
1073 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1074 this.delegate = delegate;
1078 public ShardManager create() throws Exception {
1079 return delegate.create();
1083 private static class ForwardingShardManager extends ShardManager {
1084 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1085 private CountDownLatch memberUpReceived = new CountDownLatch(1);
1086 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1087 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1088 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1089 private final ActorRef shardActor;
1090 private final String name;
1092 protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1093 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1094 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1095 super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1096 this.shardActor = shardActor;
1101 public void handleCommand(Object message) throws Exception {
1103 super.handleCommand(message);
1105 if(message instanceof FindPrimary) {
1106 findPrimaryMessageReceived.countDown();
1107 } else if(message instanceof ClusterEvent.MemberUp) {
1108 String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1109 if(!getCluster().getCurrentMemberName().equals(role)) {
1110 memberUpReceived.countDown();
1112 } else if(message instanceof ClusterEvent.MemberRemoved) {
1113 String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1114 if(!getCluster().getCurrentMemberName().equals(role)) {
1115 memberRemovedReceived.countDown();
1117 } else if(message instanceof ClusterEvent.UnreachableMember) {
1118 String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1119 if(!getCluster().getCurrentMemberName().equals(role)) {
1120 memberUnreachableReceived.countDown();
1122 } else if(message instanceof ClusterEvent.ReachableMember) {
1123 String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1124 if(!getCluster().getCurrentMemberName().equals(role)) {
1125 memberReachableReceived.countDown();
1132 public String persistenceId() {
1137 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1141 void waitForMemberUp() {
1142 assertEquals("MemberUp received", true,
1143 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1144 memberUpReceived = new CountDownLatch(1);
1147 void waitForMemberRemoved() {
1148 assertEquals("MemberRemoved received", true,
1149 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1150 memberRemovedReceived = new CountDownLatch(1);
1153 void waitForUnreachableMember() {
1154 assertEquals("UnreachableMember received", true,
1155 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1157 memberUnreachableReceived = new CountDownLatch(1);
1160 void waitForReachableMember() {
1161 assertEquals("ReachableMember received", true,
1162 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1163 memberReachableReceived = new CountDownLatch(1);
1166 void verifyFindPrimary() {
1167 assertEquals("FindPrimary received", true,
1168 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1169 findPrimaryMessageReceived = new CountDownLatch(1);