1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNull;
6 import static org.junit.Assert.assertSame;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.mock;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import static org.mockito.Mockito.when;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.AddressFromURIString;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent;
19 import akka.dispatch.Dispatchers;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.RecoveryCompleted;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import akka.util.Timeout;
26 import com.google.common.base.Optional;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.collect.ImmutableSet;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import com.typesafe.config.ConfigFactory;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.HashSet;
36 import java.util.List;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.mockito.Mock;
45 import org.mockito.MockitoAnnotations;
46 import org.opendaylight.controller.cluster.DataPersistenceProvider;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
52 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
53 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
54 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
57 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
58 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
59 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
60 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
61 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
62 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
63 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
64 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
65 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
66 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
67 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
68 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
69 import org.opendaylight.controller.cluster.raft.RaftState;
70 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
74 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
75 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
76 import scala.concurrent.Await;
77 import scala.concurrent.Future;
78 import scala.concurrent.duration.FiniteDuration;
80 public class ShardManagerTest extends AbstractActorTest {
81 private static int ID_COUNTER = 1;
83 private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
84 private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
87 private static CountDownLatch ready;
89 private static TestActorRef<MessageCollectorActor> mockShardActor;
91 private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
92 dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
94 private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
95 String name = new ShardIdentifier(shardName, memberName,"config").toString();
96 return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
99 private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
102 public void setUp() {
103 MockitoAnnotations.initMocks(this);
105 InMemoryJournal.clear();
107 if(mockShardActor == null) {
108 String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
109 mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
112 mockShardActor.underlyingActor().clear();
116 public void tearDown() {
117 InMemoryJournal.clear();
120 private Props newShardMgrProps(boolean persistent) {
121 return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
122 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
125 private Props newPropsShardMgrWithMockShardActor() {
126 return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
127 new MockConfiguration());
130 private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
131 final ClusterWrapper clusterWrapper, final Configuration config) {
132 Creator<ShardManager> creator = new Creator<ShardManager>() {
133 private static final long serialVersionUID = 1L;
135 public ShardManager create() throws Exception {
136 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
137 ready, name, shardActor, primaryShardInfoCache);
141 return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
145 public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
146 new JavaTestKit(getSystem()) {{
147 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
149 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
151 shardManager.tell(new FindPrimary("non-existent", false), getRef());
153 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
158 public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
159 new JavaTestKit(getSystem()) {{
160 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
162 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
164 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
165 shardManager.tell(new ActorInitialized(), mockShardActor);
167 DataTree mockDataTree = mock(DataTree.class);
168 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef());
170 MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
171 shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
172 RaftState.Leader.name())), mockShardActor);
174 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
176 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
177 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
178 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
179 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
184 public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
185 new JavaTestKit(getSystem()) {{
186 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
188 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
189 shardManager.tell(new ActorInitialized(), mockShardActor);
191 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
192 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
193 shardManager.tell(new RoleChangeNotification(memberId1,
194 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
195 shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
197 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
199 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
204 public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
205 new JavaTestKit(getSystem()) {{
206 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
208 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
209 shardManager.tell(new ActorInitialized(), mockShardActor);
211 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
212 MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
214 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
215 shardManager.tell(new RoleChangeNotification(memberId1,
216 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
217 shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent()), mockShardActor);
219 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
221 RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
222 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
223 primaryFound.getPrimaryPath().contains("member-2-shard-default"));
228 public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
229 new JavaTestKit(getSystem()) {{
230 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
232 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
234 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
239 public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
240 new JavaTestKit(getSystem()) {{
241 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
243 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
244 shardManager.tell(new ActorInitialized(), mockShardActor);
246 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
248 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
253 public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
254 new JavaTestKit(getSystem()) {{
255 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
257 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
258 shardManager.tell(new ActorInitialized(), mockShardActor);
260 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
261 shardManager.tell(new RoleChangeNotification(memberId,
262 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
264 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
266 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
268 DataTree mockDataTree = mock(DataTree.class);
269 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
271 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
273 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
274 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
275 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
276 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
281 public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
282 new JavaTestKit(getSystem()) {{
283 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
285 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
287 // We're passing waitUntilInitialized = true to FindPrimary so the response should be
288 // delayed until we send ActorInitialized and RoleChangeNotification.
289 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
291 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
293 shardManager.tell(new ActorInitialized(), mockShardActor);
295 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
297 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
298 shardManager.tell(new RoleChangeNotification(memberId,
299 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
301 expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
303 DataTree mockDataTree = mock(DataTree.class);
304 shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
306 LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
307 assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
308 primaryFound.getPrimaryPath().contains("member-1-shard-default"));
309 assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
311 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
316 public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
317 new JavaTestKit(getSystem()) {{
318 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
320 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
322 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
324 expectMsgClass(duration("2 seconds"), NotInitializedException.class);
326 shardManager.tell(new ActorInitialized(), mockShardActor);
328 expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
333 public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
334 new JavaTestKit(getSystem()) {{
335 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
337 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
338 shardManager.tell(new ActorInitialized(), mockShardActor);
339 shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
340 null, RaftState.Candidate.name()), mockShardActor);
342 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
344 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
349 public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
350 new JavaTestKit(getSystem()) {{
351 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
353 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
354 shardManager.tell(new ActorInitialized(), mockShardActor);
356 shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
358 expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
363 public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
364 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
366 // Create an ActorSystem ShardManager actor for member-1.
368 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
369 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
371 ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
373 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
374 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
375 new MockConfiguration()), shardManagerID);
377 // Create an ActorSystem ShardManager actor for member-2.
379 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
381 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
383 final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
385 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
386 put("default", Arrays.asList("member-1", "member-2")).
387 put("astronauts", Arrays.asList("member-2")).build());
389 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
390 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
391 mockConfig2), shardManagerID);
393 new JavaTestKit(system1) {{
395 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
396 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
398 shardManager2.tell(new ActorInitialized(), mockShardActor2);
400 String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
401 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
402 Optional.of(mock(DataTree.class))), mockShardActor2);
403 shardManager2.tell(new RoleChangeNotification(memberId2,
404 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
406 shardManager1.underlyingActor().waitForMemberUp();
408 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
410 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
411 String path = found.getPrimaryPath();
412 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
414 shardManager2.underlyingActor().verifyFindPrimary();
416 Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
418 shardManager1.underlyingActor().waitForMemberRemoved();
420 shardManager1.tell(new FindPrimary("astronauts", false), getRef());
422 expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
425 JavaTestKit.shutdownActorSystem(system1);
426 JavaTestKit.shutdownActorSystem(system2);
430 public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
431 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
433 // Create an ActorSystem ShardManager actor for member-1.
435 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
436 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
438 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
440 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
441 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
442 new MockConfiguration()), shardManagerID);
444 // Create an ActorSystem ShardManager actor for member-2.
446 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
448 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
450 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
452 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
453 put("default", Arrays.asList("member-1", "member-2")).build());
455 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
456 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
457 mockConfig2), shardManagerID);
459 new JavaTestKit(system1) {{
461 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
462 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
463 shardManager1.tell(new ActorInitialized(), mockShardActor1);
464 shardManager2.tell(new ActorInitialized(), mockShardActor2);
466 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
467 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
468 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
469 Optional.of(mock(DataTree.class))), mockShardActor1);
470 shardManager1.tell(new RoleChangeNotification(memberId1,
471 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
472 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
474 shardManager2.tell(new RoleChangeNotification(memberId2,
475 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
476 shardManager1.underlyingActor().waitForMemberUp();
478 shardManager1.tell(new FindPrimary("default", true), getRef());
480 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
481 String path = found.getPrimaryPath();
482 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
484 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
485 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
487 shardManager1.underlyingActor().waitForUnreachableMember();
489 shardManager1.tell(new FindPrimary("default", true), getRef());
491 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
493 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
494 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
496 shardManager1.underlyingActor().waitForReachableMember();
498 shardManager1.tell(new FindPrimary("default", true), getRef());
500 RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
501 String path1 = found1.getPrimaryPath();
502 assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
506 JavaTestKit.shutdownActorSystem(system1);
507 JavaTestKit.shutdownActorSystem(system2);
511 public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
512 String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
514 // Create an ActorSystem ShardManager actor for member-1.
516 final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
517 Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
519 final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
521 final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
522 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
523 new MockConfiguration()), shardManagerID);
525 // Create an ActorSystem ShardManager actor for member-2.
527 final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
529 Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
531 final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
533 MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
534 put("default", Arrays.asList("member-1", "member-2")).build());
536 final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
537 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
538 mockConfig2), shardManagerID);
540 new JavaTestKit(system1) {{
542 shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
543 shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
544 shardManager1.tell(new ActorInitialized(), mockShardActor1);
545 shardManager2.tell(new ActorInitialized(), mockShardActor2);
547 String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
548 String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
549 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
550 Optional.of(mock(DataTree.class))), mockShardActor1);
551 shardManager1.tell(new RoleChangeNotification(memberId1,
552 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
553 shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
555 shardManager2.tell(new RoleChangeNotification(memberId2,
556 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
557 shardManager1.underlyingActor().waitForMemberUp();
559 shardManager1.tell(new FindPrimary("default", true), getRef());
561 RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
562 String path = found.getPrimaryPath();
563 assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
565 primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
566 mockShardActor1.path()), Optional.<DataTree>absent()));
568 shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
569 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
571 shardManager1.underlyingActor().waitForUnreachableMember();
573 shardManager1.tell(new FindPrimary("default", true), getRef());
575 expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
577 assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
579 shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))),
581 shardManager1.tell(new RoleChangeNotification(memberId1,
582 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
584 shardManager1.tell(new FindPrimary("default", true), getRef());
586 LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
587 String path1 = found1.getPrimaryPath();
588 assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
592 JavaTestKit.shutdownActorSystem(system1);
593 JavaTestKit.shutdownActorSystem(system2);
598 public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
599 new JavaTestKit(getSystem()) {{
600 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
602 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
604 shardManager.tell(new FindLocalShard("non-existent", false), getRef());
606 LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
608 assertEquals("getShardName", "non-existent", notFound.getShardName());
613 public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
614 new JavaTestKit(getSystem()) {{
615 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
617 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
618 shardManager.tell(new ActorInitialized(), mockShardActor);
620 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
622 LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
624 assertTrue("Found path contains " + found.getPath().path().toString(),
625 found.getPath().path().toString().contains("member-1-shard-default-config"));
630 public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
631 new JavaTestKit(getSystem()) {{
632 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
634 shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
636 expectMsgClass(duration("5 seconds"), NotInitializedException.class);
641 public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
642 new JavaTestKit(getSystem()) {{
643 final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
645 shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
647 // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
648 // delayed until we send ActorInitialized.
649 Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
650 new Timeout(5, TimeUnit.SECONDS));
652 shardManager.tell(new ActorInitialized(), mockShardActor);
654 Object resp = Await.result(future, duration("5 seconds"));
655 assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
660 public void testOnRecoveryJournalIsCleaned() {
661 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
662 ImmutableSet.of("foo")));
663 InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
664 ImmutableSet.of("bar")));
665 InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
667 new JavaTestKit(getSystem()) {{
668 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
669 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
671 shardManager.underlyingActor().waitForRecoveryComplete();
672 InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
674 // Journal entries up to the last one should've been deleted
675 Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
676 synchronized (journal) {
677 assertEquals("Journal size", 1, journal.size());
678 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
684 public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
685 final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
686 InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
688 new JavaTestKit(getSystem()) {{
689 TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
690 Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
692 shardManager.underlyingActor().waitForRecoveryComplete();
694 Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
696 assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
701 public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
703 new JavaTestKit(getSystem()) {{
704 final TestActorRef<ShardManager> shardManager =
705 TestActorRef.create(getSystem(), newShardMgrProps(true));
707 assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
709 ModuleIdentifier foo = mock(ModuleIdentifier.class);
710 when(foo.getNamespace()).thenReturn(new URI("foo"));
712 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
713 moduleIdentifierSet.add(foo);
715 SchemaContext schemaContext = mock(SchemaContext.class);
716 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
718 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
720 assertEquals("getKnownModules", Sets.newHashSet("foo"),
721 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
723 ModuleIdentifier bar = mock(ModuleIdentifier.class);
724 when(bar.getNamespace()).thenReturn(new URI("bar"));
726 moduleIdentifierSet.add(bar);
728 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
730 assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
731 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
736 public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
738 new JavaTestKit(getSystem()) {{
739 final TestActorRef<ShardManager> shardManager =
740 TestActorRef.create(getSystem(), newShardMgrProps(true));
742 SchemaContext schemaContext = mock(SchemaContext.class);
743 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
745 ModuleIdentifier foo = mock(ModuleIdentifier.class);
746 when(foo.getNamespace()).thenReturn(new URI("foo"));
748 moduleIdentifierSet.add(foo);
750 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
752 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
754 assertEquals("getKnownModules", Sets.newHashSet("foo"),
755 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
757 //Create a completely different SchemaContext with only the bar module in it
758 //schemaContext = mock(SchemaContext.class);
759 moduleIdentifierSet.clear();
760 ModuleIdentifier bar = mock(ModuleIdentifier.class);
761 when(bar.getNamespace()).thenReturn(new URI("bar"));
763 moduleIdentifierSet.add(bar);
765 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
767 assertEquals("getKnownModules", Sets.newHashSet("foo"),
768 Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
774 public void testRecoveryApplicable(){
775 new JavaTestKit(getSystem()) {
777 final Props persistentProps = newShardMgrProps(true);
778 final TestActorRef<ShardManager> persistentShardManager =
779 TestActorRef.create(getSystem(), persistentProps);
781 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
783 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
785 final Props nonPersistentProps = newShardMgrProps(false);
786 final TestActorRef<ShardManager> nonPersistentShardManager =
787 TestActorRef.create(getSystem(), nonPersistentProps);
789 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
791 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
799 public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
801 final CountDownLatch persistLatch = new CountDownLatch(1);
802 final Creator<ShardManager> creator = new Creator<ShardManager>() {
803 private static final long serialVersionUID = 1L;
805 public ShardManager create() throws Exception {
806 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
807 ready, new PrimaryShardInfoFutureCache()) {
809 protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
810 DataPersistenceProviderMonitor dataPersistenceProviderMonitor
811 = new DataPersistenceProviderMonitor();
812 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
813 return dataPersistenceProviderMonitor;
819 new JavaTestKit(getSystem()) {{
821 final TestActorRef<ShardManager> shardManager =
822 TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
824 ModuleIdentifier foo = mock(ModuleIdentifier.class);
825 when(foo.getNamespace()).thenReturn(new URI("foo"));
827 Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
828 moduleIdentifierSet.add(foo);
830 SchemaContext schemaContext = mock(SchemaContext.class);
831 when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
833 shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
835 assertEquals("Persisted", true,
836 Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
842 public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
843 new JavaTestKit(getSystem()) {
845 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
847 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
848 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
849 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
851 verify(ready, never()).countDown();
853 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
854 Optional.of(mock(DataTree.class))));
856 verify(ready, times(1)).countDown();
862 public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
863 new JavaTestKit(getSystem()) {
865 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
867 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
868 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
869 memberId, null, RaftState.Follower.name()));
871 verify(ready, never()).countDown();
873 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
875 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
876 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
878 verify(ready, times(1)).countDown();
884 public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
885 new JavaTestKit(getSystem()) {
887 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
889 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
890 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
891 memberId, null, RaftState.Follower.name()));
893 verify(ready, never()).countDown();
895 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
896 "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
898 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
900 verify(ready, times(1)).countDown();
906 public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
907 new JavaTestKit(getSystem()) {
909 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
911 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
912 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
914 verify(ready, never()).countDown();
921 public void testByDefaultSyncStatusIsFalse() throws Exception{
922 final Props persistentProps = newShardMgrProps(true);
923 final TestActorRef<ShardManager> shardManager =
924 TestActorRef.create(getSystem(), persistentProps);
926 ShardManager shardManagerActor = shardManager.underlyingActor();
928 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
932 public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
933 final Props persistentProps = ShardManager.props(
934 new MockClusterWrapper(),
935 new MockConfiguration(),
936 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
937 final TestActorRef<ShardManager> shardManager =
938 TestActorRef.create(getSystem(), persistentProps);
940 ShardManager shardManagerActor = shardManager.underlyingActor();
941 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
942 RaftState.Follower.name(), RaftState.Leader.name()));
944 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
948 public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
949 final Props persistentProps = newShardMgrProps(true);
950 final TestActorRef<ShardManager> shardManager =
951 TestActorRef.create(getSystem(), persistentProps);
953 ShardManager shardManagerActor = shardManager.underlyingActor();
954 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
955 RaftState.Follower.name(), RaftState.Candidate.name()));
957 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
959 // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
960 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
962 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
966 public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
967 final Props persistentProps = ShardManager.props(
968 new MockClusterWrapper(),
969 new MockConfiguration(),
970 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
971 final TestActorRef<ShardManager> shardManager =
972 TestActorRef.create(getSystem(), persistentProps);
974 ShardManager shardManagerActor = shardManager.underlyingActor();
975 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
976 RaftState.Candidate.name(), RaftState.Follower.name()));
978 // Initially will be false
979 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
981 // Send status true will make sync status true
982 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
984 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
986 // Send status false will make sync status false
987 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
989 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
994 public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
995 final Props persistentProps = ShardManager.props(
996 new MockClusterWrapper(),
997 new MockConfiguration() {
999 public List<String> getMemberShardNames(String memberName) {
1000 return Arrays.asList("default", "astronauts");
1003 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
1004 final TestActorRef<ShardManager> shardManager =
1005 TestActorRef.create(getSystem(), persistentProps);
1007 ShardManager shardManagerActor = shardManager.underlyingActor();
1009 // Initially will be false
1010 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
1012 // Make default shard leader
1013 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
1014 RaftState.Follower.name(), RaftState.Leader.name()));
1016 // default = Leader, astronauts is unknown so sync status remains false
1017 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
1019 // Make astronauts shard leader as well
1020 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
1021 RaftState.Follower.name(), RaftState.Leader.name()));
1023 // Now sync status should be true
1024 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
1026 // Make astronauts a Follower
1027 shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
1028 RaftState.Leader.name(), RaftState.Follower.name()));
1030 // Sync status is not true
1031 assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
1033 // Make the astronauts follower sync status true
1034 shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
1036 // Sync status is now true
1037 assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
1041 private static class TestShardManager extends ShardManager {
1042 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1044 TestShardManager(String shardMrgIDSuffix) {
1045 super(new MockClusterWrapper(), new MockConfiguration(),
1046 DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1047 new PrimaryShardInfoFutureCache());
1051 public void handleRecover(Object message) throws Exception {
1053 super.handleRecover(message);
1055 if(message instanceof RecoveryCompleted) {
1056 recoveryComplete.countDown();
1061 void waitForRecoveryComplete() {
1062 assertEquals("Recovery complete", true,
1063 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1067 @SuppressWarnings("serial")
1068 static class TestShardManagerCreator implements Creator<TestShardManager> {
1069 String shardMrgIDSuffix;
1071 TestShardManagerCreator(String shardMrgIDSuffix) {
1072 this.shardMrgIDSuffix = shardMrgIDSuffix;
1076 public TestShardManager create() throws Exception {
1077 return new TestShardManager(shardMrgIDSuffix);
1082 private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1083 private static final long serialVersionUID = 1L;
1084 private final Creator<ShardManager> delegate;
1086 public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1087 this.delegate = delegate;
1091 public ShardManager create() throws Exception {
1092 return delegate.create();
1096 private static class ForwardingShardManager extends ShardManager {
1097 private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1098 private CountDownLatch memberUpReceived = new CountDownLatch(1);
1099 private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1100 private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1101 private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1102 private final ActorRef shardActor;
1103 private final String name;
1105 protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1106 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1107 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1108 super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1109 this.shardActor = shardActor;
1114 public void handleCommand(Object message) throws Exception {
1116 super.handleCommand(message);
1118 if(message instanceof FindPrimary) {
1119 findPrimaryMessageReceived.countDown();
1120 } else if(message instanceof ClusterEvent.MemberUp) {
1121 String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1122 if(!getCluster().getCurrentMemberName().equals(role)) {
1123 memberUpReceived.countDown();
1125 } else if(message instanceof ClusterEvent.MemberRemoved) {
1126 String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1127 if(!getCluster().getCurrentMemberName().equals(role)) {
1128 memberRemovedReceived.countDown();
1130 } else if(message instanceof ClusterEvent.UnreachableMember) {
1131 String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1132 if(!getCluster().getCurrentMemberName().equals(role)) {
1133 memberUnreachableReceived.countDown();
1135 } else if(message instanceof ClusterEvent.ReachableMember) {
1136 String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1137 if(!getCluster().getCurrentMemberName().equals(role)) {
1138 memberReachableReceived.countDown();
1145 public String persistenceId() {
1150 protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1154 void waitForMemberUp() {
1155 assertEquals("MemberUp received", true,
1156 Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1157 memberUpReceived = new CountDownLatch(1);
1160 void waitForMemberRemoved() {
1161 assertEquals("MemberRemoved received", true,
1162 Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1163 memberRemovedReceived = new CountDownLatch(1);
1166 void waitForUnreachableMember() {
1167 assertEquals("UnreachableMember received", true,
1168 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1170 memberUnreachableReceived = new CountDownLatch(1);
1173 void waitForReachableMember() {
1174 assertEquals("ReachableMember received", true,
1175 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1176 memberReachableReceived = new CountDownLatch(1);
1179 void verifyFindPrimary() {
1180 assertEquals("FindPrimary received", true,
1181 Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1182 findPrimaryMessageReceived = new CountDownLatch(1);