2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.cluster.Cluster;
25 import akka.cluster.ClusterEvent;
26 import akka.dispatch.Dispatchers;
27 import akka.japi.Creator;
28 import akka.pattern.Patterns;
29 import akka.persistence.RecoveryCompleted;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Sets;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import com.typesafe.config.ConfigFactory;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
55 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
56 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
58 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
59 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
60 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
61 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
62 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
63 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
64 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
65 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
66 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
67 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
68 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
69 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
70 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
71 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
72 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
73 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
74 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
75 import org.opendaylight.controller.cluster.raft.RaftState;
76 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
77 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
78 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
79 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
81 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
82 import scala.concurrent.Await;
83 import scala.concurrent.Future;
84 import scala.concurrent.duration.FiniteDuration;
86 public class ShardManagerTest extends AbstractActorTest {
87 private static int ID_COUNTER = 1;
89 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
90 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
93 private static CountDownLatch ready;
95 private static TestActorRef<MessageCollectorActor> mockShardActor;
97 private static String mockShardName;
99 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
100 dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
101 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
103 private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
104 String name = new ShardIdentifier(shardName, memberName,"config").toString();
105 return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
108 private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
111 public void setUp() {
112 MockitoAnnotations.initMocks(this);
114 InMemoryJournal.clear();
116 if(mockShardActor == null) {
117 mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
118 mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
121 mockShardActor.underlyingActor().clear();
125 public void tearDown() {
126 InMemoryJournal.clear();
129 private Props newShardMgrProps(boolean persistent) {
130 return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
131 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
134 private Props newPropsShardMgrWithMockShardActor() {
135 return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
136 new MockConfiguration());
139 private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
140 final ClusterWrapper clusterWrapper, final Configuration config) {
141 Creator<ShardManager> creator = new Creator<ShardManager>() {
142 private static final long serialVersionUID = 1L;
144 public ShardManager create() throws Exception {
145 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
146 ready, name, shardActor, primaryShardInfoCache);
150 return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
154 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
155 new JavaTestKit(getSystem()) {{
156 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
158 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
160 shardManager.tell(new FindPrimary("non-existent", false), getRef());
162 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
167 public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
168 new JavaTestKit(getSystem()) {{
169 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
171 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
173 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
174 shardManager.tell(new ActorInitialized(), mockShardActor);
176 DataTree mockDataTree = mock(DataTree.class);
177 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
178 DataStoreVersions.CURRENT_VERSION), getRef());
180 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
181 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
182 RaftState.Leader.name())), mockShardActor);
184 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
186 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
187 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
188 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
189 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
194 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
195 new JavaTestKit(getSystem()) {{
196 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
198 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
199 shardManager.tell(new ActorInitialized(), mockShardActor);
201 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
202 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
203 shardManager.tell(new RoleChangeNotification(memberId1,
204 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
205 shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
207 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
209 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
214 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
215 new JavaTestKit(getSystem()) {{
216 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
218 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
219 shardManager.tell(new ActorInitialized(), mockShardActor);
221 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
222 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
224 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
225 shardManager.tell(new RoleChangeNotification(memberId1,
226 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
227 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
228 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
229 leaderVersion), mockShardActor);
231 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
233 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
234 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
235 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
236 assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
241 public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
242 new JavaTestKit(getSystem()) {{
243 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
245 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
247 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
252 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
253 new JavaTestKit(getSystem()) {{
254 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
256 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
257 shardManager.tell(new ActorInitialized(), mockShardActor);
259 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
261 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
266 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
267 new JavaTestKit(getSystem()) {{
268 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
270 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
271 shardManager.tell(new ActorInitialized(), mockShardActor);
273 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
274 shardManager.tell(new RoleChangeNotification(memberId,
275 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
277 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
279 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
281 DataTree mockDataTree = mock(DataTree.class);
282 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
283 DataStoreVersions.CURRENT_VERSION), mockShardActor);
285 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
287 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
288 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
289 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
290 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
295 public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
296 new JavaTestKit(getSystem()) {{
297 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
299 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
301 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
302 // delayed until we send ActorInitialized and RoleChangeNotification.
303 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
305 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
307 shardManager.tell(new ActorInitialized(), mockShardActor);
309 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
311 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
312 shardManager.tell(new RoleChangeNotification(memberId,
313 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
315 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
317 DataTree mockDataTree = mock(DataTree.class);
318 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
319 DataStoreVersions.CURRENT_VERSION), mockShardActor);
321 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
322 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
323 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
324 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
326 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
331 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
332 new JavaTestKit(getSystem()) {{
333 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
335 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
337 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
339 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
341 shardManager.tell(new ActorInitialized(), mockShardActor);
343 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
348 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
349 new JavaTestKit(getSystem()) {{
350 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
352 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
353 shardManager.tell(new ActorInitialized(), mockShardActor);
354 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
355 null, RaftState.Candidate.name()), mockShardActor);
357 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
359 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
364 public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
365 new JavaTestKit(getSystem()) {{
366 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
368 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
369 shardManager.tell(new ActorInitialized(), mockShardActor);
370 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
371 null, RaftState.IsolatedLeader.name()), mockShardActor);
373 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
375 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
380 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
381 new JavaTestKit(getSystem()) {{
382 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
384 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
385 shardManager.tell(new ActorInitialized(), mockShardActor);
387 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
389 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
394 public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
395 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
397 // Create an ActorSystem ShardManager actor for member-1.
399 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
400 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
402 ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
404 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
405 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
406 new MockConfiguration()), shardManagerID);
408 // Create an ActorSystem ShardManager actor for member-2.
410 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
412 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
414 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
416 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
417 put("default", Arrays.asList("member-1", "member-2")).
418 put("astronauts", Arrays.asList("member-2")).build());
420 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
421 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
422 mockConfig2), shardManagerID);
424 new JavaTestKit(system1) {{
426 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
427 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
429 shardManager2.tell(new ActorInitialized(), mockShardActor2);
431 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
432 short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
433 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
434 Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
435 shardManager2.tell(new RoleChangeNotification(memberId2,
436 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
438 shardManager1.underlyingActor().waitForMemberUp();
440 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
442 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
443 String path = found.getPrimaryPath();
444 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
445 assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
447 shardManager2.underlyingActor().verifyFindPrimary();
449 Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
451 shardManager1.underlyingActor().waitForMemberRemoved();
453 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
455 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
458 JavaTestKit.shutdownActorSystem(system1);
459 JavaTestKit.shutdownActorSystem(system2);
463 public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
464 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
466 // Create an ActorSystem ShardManager actor for member-1.
468 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
469 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
471 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
473 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
474 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
475 new MockConfiguration()), shardManagerID);
477 // Create an ActorSystem ShardManager actor for member-2.
479 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
481 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
483 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
485 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
486 put("default", Arrays.asList("member-1", "member-2")).build());
488 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
489 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
490 mockConfig2), shardManagerID);
492 new JavaTestKit(system1) {{
494 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
495 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
496 shardManager1.tell(new ActorInitialized(), mockShardActor1);
497 shardManager2.tell(new ActorInitialized(), mockShardActor2);
499 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
500 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
501 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
502 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
503 shardManager1.tell(new RoleChangeNotification(memberId1,
504 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
505 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
506 DataStoreVersions.CURRENT_VERSION),
508 shardManager2.tell(new RoleChangeNotification(memberId2,
509 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
510 shardManager1.underlyingActor().waitForMemberUp();
512 shardManager1.tell(new FindPrimary("default", true), getRef());
514 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
515 String path = found.getPrimaryPath();
516 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
518 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
519 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
521 shardManager1.underlyingActor().waitForUnreachableMember();
523 shardManager1.tell(new FindPrimary("default", true), getRef());
525 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
527 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
528 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
530 shardManager1.underlyingActor().waitForReachableMember();
532 shardManager1.tell(new FindPrimary("default", true), getRef());
534 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
535 String path1 = found1.getPrimaryPath();
536 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
540 JavaTestKit.shutdownActorSystem(system1);
541 JavaTestKit.shutdownActorSystem(system2);
545 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
546 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
548 // Create an ActorSystem ShardManager actor for member-1.
550 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
551 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
553 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
555 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
556 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
557 new MockConfiguration()), shardManagerID);
559 // Create an ActorSystem ShardManager actor for member-2.
561 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
563 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
565 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
567 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
568 put("default", Arrays.asList("member-1", "member-2")).build());
570 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
571 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
572 mockConfig2), shardManagerID);
574 new JavaTestKit(system1) {{
576 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
577 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
578 shardManager1.tell(new ActorInitialized(), mockShardActor1);
579 shardManager2.tell(new ActorInitialized(), mockShardActor2);
581 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
582 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
583 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
584 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
585 shardManager1.tell(new RoleChangeNotification(memberId1,
586 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
587 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
588 DataStoreVersions.CURRENT_VERSION),
590 shardManager2.tell(new RoleChangeNotification(memberId2,
591 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
592 shardManager1.underlyingActor().waitForMemberUp();
594 shardManager1.tell(new FindPrimary("default", true), getRef());
596 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
597 String path = found.getPrimaryPath();
598 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
600 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
601 mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
603 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
604 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
606 shardManager1.underlyingActor().waitForUnreachableMember();
608 shardManager1.tell(new FindPrimary("default", true), getRef());
610 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
612 assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
614 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
615 DataStoreVersions.CURRENT_VERSION), mockShardActor1);
616 shardManager1.tell(new RoleChangeNotification(memberId1,
617 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
619 shardManager1.tell(new FindPrimary("default", true), getRef());
621 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
622 String path1 = found1.getPrimaryPath();
623 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
627 JavaTestKit.shutdownActorSystem(system1);
628 JavaTestKit.shutdownActorSystem(system2);
633 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
634 new JavaTestKit(getSystem()) {{
635 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
637 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
639 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
641 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
643 assertEquals("getShardName", "non-existent", notFound.getShardName());
648 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
649 new JavaTestKit(getSystem()) {{
650 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
652 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
653 shardManager.tell(new ActorInitialized(), mockShardActor);
655 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
657 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
659 assertTrue("Found path contains " + found.getPath().path().toString(),
660 found.getPath().path().toString().contains("member-1-shard-default-config"));
665 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
666 new JavaTestKit(getSystem()) {{
667 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
669 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
671 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
676 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
677 new JavaTestKit(getSystem()) {{
678 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
680 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
682 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
683 // delayed until we send ActorInitialized.
684 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
685 new Timeout(5, TimeUnit.SECONDS));
687 shardManager.tell(new ActorInitialized(), mockShardActor);
689 Object resp = Await.result(future, duration("5 seconds"));
690 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
695 public void testOnRecoveryJournalIsCleaned() {
696 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
697 ImmutableSet.of("foo")));
698 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
699 ImmutableSet.of("bar")));
700 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
702 new JavaTestKit(getSystem()) {{
703 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
704 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
706 shardManager.underlyingActor().waitForRecoveryComplete();
707 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
709 // Journal entries up to the last one should've been deleted
710 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
711 synchronized (journal) {
712 assertEquals("Journal size", 0, journal.size());
718 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
719 new JavaTestKit(getSystem()) {
721 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
723 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
724 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
725 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
727 verify(ready, never()).countDown();
729 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
730 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
732 verify(ready, times(1)).countDown();
738 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
739 new JavaTestKit(getSystem()) {
741 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
743 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
744 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
745 memberId, null, RaftState.Follower.name()));
747 verify(ready, never()).countDown();
749 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
751 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
752 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
753 DataStoreVersions.CURRENT_VERSION));
755 verify(ready, times(1)).countDown();
761 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
762 new JavaTestKit(getSystem()) {
764 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
766 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
767 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
768 memberId, null, RaftState.Follower.name()));
770 verify(ready, never()).countDown();
772 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
773 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
774 DataStoreVersions.CURRENT_VERSION));
776 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
778 verify(ready, times(1)).countDown();
784 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
785 new JavaTestKit(getSystem()) {
787 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
789 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
790 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
792 verify(ready, never()).countDown();
799 public void testByDefaultSyncStatusIsFalse() throws Exception{
800 final Props persistentProps = newShardMgrProps(true);
801 final TestActorRef<ShardManager> shardManager =
802 TestActorRef.create(getSystem(), persistentProps);
804 ShardManager shardManagerActor = shardManager.underlyingActor();
806 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
810 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
811 final Props persistentProps = ShardManager.props(
812 new MockClusterWrapper(),
813 new MockConfiguration(),
814 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
815 final TestActorRef<ShardManager> shardManager =
816 TestActorRef.create(getSystem(), persistentProps);
818 ShardManager shardManagerActor = shardManager.underlyingActor();
819 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
820 RaftState.Follower.name(), RaftState.Leader.name()));
822 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
826 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
827 final Props persistentProps = newShardMgrProps(true);
828 final TestActorRef<ShardManager> shardManager =
829 TestActorRef.create(getSystem(), persistentProps);
831 ShardManager shardManagerActor = shardManager.underlyingActor();
832 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
833 RaftState.Follower.name(), RaftState.Candidate.name()));
835 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
837 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
838 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
840 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
844 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
845 final Props persistentProps = ShardManager.props(
846 new MockClusterWrapper(),
847 new MockConfiguration(),
848 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
849 final TestActorRef<ShardManager> shardManager =
850 TestActorRef.create(getSystem(), persistentProps);
852 ShardManager shardManagerActor = shardManager.underlyingActor();
853 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
854 RaftState.Candidate.name(), RaftState.Follower.name()));
856 // Initially will be false
857 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
859 // Send status true will make sync status true
860 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
862 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
864 // Send status false will make sync status false
865 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
867 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
872 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
873 final Props persistentProps = ShardManager.props(
874 new MockClusterWrapper(),
875 new MockConfiguration() {
877 public List<String> getMemberShardNames(String memberName) {
878 return Arrays.asList("default", "astronauts");
881 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
882 final TestActorRef<ShardManager> shardManager =
883 TestActorRef.create(getSystem(), persistentProps);
885 ShardManager shardManagerActor = shardManager.underlyingActor();
887 // Initially will be false
888 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
890 // Make default shard leader
891 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
892 RaftState.Follower.name(), RaftState.Leader.name()));
894 // default = Leader, astronauts is unknown so sync status remains false
895 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
897 // Make astronauts shard leader as well
898 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
899 RaftState.Follower.name(), RaftState.Leader.name()));
901 // Now sync status should be true
902 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
904 // Make astronauts a Follower
905 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
906 RaftState.Leader.name(), RaftState.Follower.name()));
908 // Sync status is not true
909 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
911 // Make the astronauts follower sync status true
912 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
914 // Sync status is now true
915 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
920 public void testOnReceiveSwitchShardBehavior() throws Exception {
921 new JavaTestKit(getSystem()) {{
922 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
924 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
925 shardManager.tell(new ActorInitialized(), mockShardActor);
927 shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
929 SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
931 assertEquals(RaftState.Leader, switchBehavior.getNewState());
932 assertEquals(1000, switchBehavior.getNewTerm());
936 public void testOnReceiveCreateShard() {
937 new JavaTestKit(getSystem()) {{
938 datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
940 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
942 SchemaContext schemaContext = TestModel.createTestContext();
943 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
945 DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
946 persistent(false).build();
947 TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
949 shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator,
950 datastoreContext), getRef());
952 expectMsgClass(duration("5 seconds"), CreateShardReply.class);
954 shardManager.tell(new FindLocalShard("foo", true), getRef());
956 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
958 assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
959 assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
960 new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
961 shardPropsCreator.peerAddresses.keySet());
962 assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
963 shardPropsCreator.shardId);
964 assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
966 // Send CreateShard with same name - should fail.
968 shardManager.tell(new CreateShard("foo", Collections.<String>emptyList(), shardPropsCreator, null), getRef());
970 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
975 public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
976 new JavaTestKit(getSystem()) {{
977 ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
979 TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
981 shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef());
983 expectMsgClass(duration("5 seconds"), CreateShardReply.class);
985 SchemaContext schemaContext = TestModel.createTestContext();
986 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
988 shardManager.tell(new FindLocalShard("foo", true), getRef());
990 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
992 assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
993 assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
997 private static class TestShardPropsCreator implements ShardPropsCreator {
998 ShardIdentifier shardId;
999 Map<String, String> peerAddresses;
1000 SchemaContext schemaContext;
1001 DatastoreContext datastoreContext;
1004 public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
1005 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1006 this.shardId = shardId;
1007 this.peerAddresses = peerAddresses;
1008 this.schemaContext = schemaContext;
1009 this.datastoreContext = datastoreContext;
1010 return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1015 private static class TestShardManager extends ShardManager {
1016 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1018 TestShardManager(String shardMrgIDSuffix) {
1019 super(new MockClusterWrapper(), new MockConfiguration(),
1020 DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1021 new PrimaryShardInfoFutureCache());
1025 public void handleRecover(Object message) throws Exception {
1027 super.handleRecover(message);
1029 if(message instanceof RecoveryCompleted) {
1030 recoveryComplete.countDown();
1035 void waitForRecoveryComplete() {
1036 assertEquals("Recovery complete", true,
1037 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1041 @SuppressWarnings("serial")
1042 static class TestShardManagerCreator implements Creator<TestShardManager> {
1043 String shardMrgIDSuffix;
1045 TestShardManagerCreator(String shardMrgIDSuffix) {
1046 this.shardMrgIDSuffix = shardMrgIDSuffix;
1050 public TestShardManager create() throws Exception {
1051 return new TestShardManager(shardMrgIDSuffix);
1056 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1057 private static final long serialVersionUID = 1L;
1058 private final Creator<ShardManager> delegate;
1060 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1061 this.delegate = delegate;
1065 public ShardManager create() throws Exception {
1066 return delegate.create();
1070 private static class ForwardingShardManager extends ShardManager {
1071 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1072 private CountDownLatch memberUpReceived = new CountDownLatch(1);
1073 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1074 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1075 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1076 private final ActorRef shardActor;
1077 private final String name;
1079 protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1080 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1081 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1082 super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1083 this.shardActor = shardActor;
1088 public void handleCommand(Object message) throws Exception {
1090 super.handleCommand(message);
1092 if(message instanceof FindPrimary) {
1093 findPrimaryMessageReceived.countDown();
1094 } else if(message instanceof ClusterEvent.MemberUp) {
1095 String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1096 if(!getCluster().getCurrentMemberName().equals(role)) {
1097 memberUpReceived.countDown();
1099 } else if(message instanceof ClusterEvent.MemberRemoved) {
1100 String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1101 if(!getCluster().getCurrentMemberName().equals(role)) {
1102 memberRemovedReceived.countDown();
1104 } else if(message instanceof ClusterEvent.UnreachableMember) {
1105 String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1106 if(!getCluster().getCurrentMemberName().equals(role)) {
1107 memberUnreachableReceived.countDown();
1109 } else if(message instanceof ClusterEvent.ReachableMember) {
1110 String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1111 if(!getCluster().getCurrentMemberName().equals(role)) {
1112 memberReachableReceived.countDown();
1119 public String persistenceId() {
1124 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1128 void waitForMemberUp() {
1129 assertEquals("MemberUp received", true,
1130 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1131 memberUpReceived = new CountDownLatch(1);
1134 void waitForMemberRemoved() {
1135 assertEquals("MemberRemoved received", true,
1136 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1137 memberRemovedReceived = new CountDownLatch(1);
1140 void waitForUnreachableMember() {
1141 assertEquals("UnreachableMember received", true,
1142 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1144 memberUnreachableReceived = new CountDownLatch(1);
1147 void waitForReachableMember() {
1148 assertEquals("ReachableMember received", true,
1149 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1150 memberReachableReceived = new CountDownLatch(1);
1153 void verifyFindPrimary() {
1154 assertEquals("FindPrimary received", true,
1155 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1156 findPrimaryMessageReceived = new CountDownLatch(1);