-
- @Test
- public void testPrefixShardReplicas() throws Exception {
- LOG.info("testPrefixShardReplicas starting");
- final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
-
- // Create ACtorSystem for member-1
- final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
-
- final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
- newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
- .waitTillReadyCountDownLatch(ready)
- .cluster(new ClusterWrapperImpl(system1))
- .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- shardManagerID);
-
- // Create an ActorSystem ShardManager actor for member-2.
-
- final ActorSystem system2 = newActorSystem("Member2");
-
- Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
-
- final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
- newTestShardMgrBuilder()
- .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
- .waitTillReadyCountDownLatch(ready)
- .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
- Dispatchers.DefaultDispatcherId()),
- shardManagerID);
-
- final JavaTestKit kit2 = new JavaTestKit(system2);
-
- new JavaTestKit(system1) {
- {
- shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-
- // check shard does not exist
- shardManager1.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
- shardManager2.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
- // create shard on node1
- final Builder builder = Shard.builder();
-
- final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
- new PrefixShardConfiguration(TEST_ID,
- PrefixShardStrategy.NAME,
- Lists.newArrayList(MEMBER_1, MEMBER_2)),
- datastoreContextBuilder.build(), builder);
-
- shardManager1.tell(createPrefixedShard, getRef());
- expectMsgClass(duration("5 seconds"), Success.class);
-
- shardManager1.underlyingActor().waitForMemberUp();
-
- LOG.info("changed leader state");
-
- // check node2 cannot find it locally
- shardManager1.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
- shardManager2.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
- // but can remotely
- shardManager2.tell(new FindPrimary(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
-
- // add replica and verify if succesful
- shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), Success.class);
-
- // verify we have a replica on manager2 now
- shardManager2.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
- kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
- }
- };
- }
-
- private ActorSystem newActorSystem(final String config) {
- final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
- actorSystems.add(system);
- return system;
- }
-}