@Test
public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
+ datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
new JavaTestKit(getSystem()) {{
final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
newTestShardMgrBuilderWithMockShardActor().cluster(
- new ClusterWrapperImpl(system1)).props(), shardManagerID);
+ new ClusterWrapperImpl(system1)).props().withDispatcher(
+ Dispatchers.DefaultDispatcherId()), shardManagerID);
// Create an ActorSystem ShardManager actor for member-2.
final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
- new ClusterWrapperImpl(system2)).props(), shardManagerID);
+ new ClusterWrapperImpl(system2)).props().withDispatcher(
+ Dispatchers.DefaultDispatcherId()), shardManagerID);
new JavaTestKit(system1) {{
final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
- new ClusterWrapperImpl(system1)).props(), shardManagerID);
+ new ClusterWrapperImpl(system1)).props().withDispatcher(
+ Dispatchers.DefaultDispatcherId()), shardManagerID);
// Create an ActorSystem ShardManager actor for member-2.
final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
- new ClusterWrapperImpl(system2)).props(), shardManagerID);
+ new ClusterWrapperImpl(system2)).props().withDispatcher(
+ Dispatchers.DefaultDispatcherId()), shardManagerID);
new JavaTestKit(system1) {{
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
- shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
- createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+ shardManager1.tell(MockClusterWrapper.
+ createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
shardManager1.underlyingActor().waitForUnreachableMember();
assertEquals("getMemberName", "member-2", peerDown.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
- shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
- createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+ shardManager1.tell(MockClusterWrapper.
+ createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
- shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
- createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+ shardManager1.tell(MockClusterWrapper.
+ createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
shardManager1.underlyingActor().waitForReachableMember();
String path1 = found1.getPrimaryPath();
assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
- shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
- createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+ shardManager1.tell(MockClusterWrapper.
+ createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
// Test FindPrimary wait succeeds after reachable member event.
- shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
- createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+ shardManager1.tell(MockClusterWrapper.
+ createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
shardManager1.underlyingActor().waitForUnreachableMember();
shardManager1.tell(new FindPrimary("default", true), getRef());
- shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
- createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+ shardManager1.tell(MockClusterWrapper.
+ createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
String path2 = found2.getPrimaryPath();
final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
- new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(),
- shardManagerID);
+ new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
// Create an ActorSystem ShardManager actor for member-2.
final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
- new ClusterWrapperImpl(system2)).props(), shardManagerID);
+ new ClusterWrapperImpl(system2)).props().withDispatcher(
+ Dispatchers.DefaultDispatcherId()), shardManagerID);
new JavaTestKit(system1) {{
shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
- shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
- createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+ shardManager1.tell(MockClusterWrapper.
+ createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
shardManager1.underlyingActor().waitForUnreachableMember();
public void testRestoreFromSnapshot() throws Throwable {
LOG.info("testRestoreFromSnapshot starting");
+ datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
+
JavaTestKit kit = new JavaTestKit(getSystem());
MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
@Test
public void testAddShardReplica() throws Exception {
+ LOG.info("testAddShardReplica starting");
MockConfiguration mockConfig =
new MockConfiguration(ImmutableMap.<String, List<String>>builder().
put("default", Arrays.asList("member-1", "member-2")).
ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
- new ClusterWrapperImpl(system1)).props(), shardManagerID);
+ new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
// Create an ActorSystem ShardManager actor for member-2.
final ActorSystem system2 = newActorSystem("Member2");
String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+ TestActorRef.create(system2, Props.create(MockRespondActor.class).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), name);
final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
- new ClusterWrapperImpl(system2)).props(), shardManagerID);
+ new ClusterWrapperImpl(system2)).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
new JavaTestKit(system1) {{
assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
Sets.newHashSet(shardManagerSnapshot.getShardList()));
}};
-
LOG.info("testAddShardReplica ending");
}
@Test
public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
+ datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
new JavaTestKit(getSystem()) {{
MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
put("astronauts", Arrays.asList("member-2")).build());
shardActor(mockShardActor).props(), shardMgrID);
newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
+ MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
+ AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
- new ClusterWrapperImpl(system1)).props(),
+ new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
shardManagerID);
// Create an ActorSystem ShardManager actor for member-2.
final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
- new ClusterWrapperImpl(system2)).props(),
+ new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
shardManagerID);
// Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
- new MockClusterWrapper()).props(),
+ new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
shardMgrID);
shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
- MessageCollectorActor.props(), shardId1);
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
type(shardMrgIDSuffix).build().toString();
TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
- MessageCollectorActor.props(), shardId2);
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
- mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
+ mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), shard1);