+
+ @Test
+ public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ EntityOwnerSelectionStrategyConfig.Builder builder
+ = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
+
+ String peerId = newShardId("follower").toString();
+ TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+ peer.underlyingActor().grantVote = true;
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
+ ImmutableMap.of(peerId.toString(), peer.path().toString())));
+ ShardTestKit.waitUntilLeader(shard);
+
+ DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+ ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+ // Add a remote candidate
+
+ String remoteMemberName1 = "follower";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ // Register local
+
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Verify the local candidate becomes owner
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ }
+
+ @Test
+ public void testDelayedEntityOwnerSelection() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ EntityOwnerSelectionStrategyConfig.Builder builder
+ = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
+
+ String follower1Id = newShardId("follower1").toString();
+ TestActorRef<MockFollower> follower1 = actorFactory.createTestActor(Props.create(MockFollower.class, follower1Id, false).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), follower1Id);
+
+ follower1.underlyingActor().grantVote = true;
+
+ String follower2Id = newShardId("follower").toString();
+ TestActorRef<MockFollower> follower2 = actorFactory.createTestActor(Props.create(MockFollower.class, follower2Id, false).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), follower2Id);
+
+ follower2.underlyingActor().grantVote = true;
+
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
+ ImmutableMap.of(follower1Id.toString(), follower2.path().toString(), follower2Id.toString(), follower2.path().toString())));
+ ShardTestKit.waitUntilLeader(shard);
+
+ DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+ ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+ // Add a remote candidate
+
+ String remoteMemberName1 = "follower";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ // Register local
+
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Verify the local candidate becomes owner
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ }
+
+ private static class MockLeader extends UntypedActor {