+ @Test
+ public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
+ ShardIdentifier leaderId = newShardId("leader");
+ ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
+ TestEntityOwnershipShard.class, localId,
+ ImmutableMap.<String, String>builder().put(leaderId.toString(), "".toString()).build(),
+ dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
+ ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
+ LOCAL_MEMBER_NAME).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
+ leader.tell(new ElectionTimeout(), leader);
+
+ kit.waitUntilLeader(leader);
+
+ shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ // Register local candidate
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+ reset(candidate);
+
+ // Simulate a replicated commit from the leader to remove the local candidate that would occur after a
+ // network partition is healed.
+
+ leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
+
+ verify(candidate, timeout(5000)).ownershipChanged(entity, true, false);
+
+ // Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
+
+ verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+ // Unregister the local candidate and verify it's removed and no re-added.
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
+ }
+