+ leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ // Capture the CandidateAdded messages.
+
+ final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
+ CandidateAdded.class, 2);
+
+ // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
+ // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
+
+ peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+ peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+
+ // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
+
+ leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
+ leader.tell(candidateAdded.get(0), leader);
+ leader.tell(candidateAdded.get(1), leader);
+
+ expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
+ ae -> ae.getEntries().size() > 0);
+
+ // Verify no owner assigned.
+
+ verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
+ verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
+
+ // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+ leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
+ leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+ peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
+ ae -> ae.getLeaderId().equals(leaderId.toString()));
+ peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+ // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
+
+ leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+
+ // Verify the leader transitions to IsolatedLeader.
+
+ verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
+ state.getRaftState()));
+
+ // Send PeerDown to the new leader peer1.
+
+ peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+ // Make peer1 start an election and become leader by sending the TimeoutNow message.
+
+ peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+ // Verify the peer1 transitions to Leader.
+
+ verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
+ state.getRaftState()));
+
+ verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
+ verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
+
+ verifyNoMoreInteractions(peer1Listener);
+ verifyNoMoreInteractions(peer2Listener);
+
+ // Add candidate peer1 candidate for entity2.
+
+ peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+
+ verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+ verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
+ verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
+
+ reset(leaderListener, peer1Listener, peer2Listener);
+
+ // Remove the isolation.
+
+ leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
+ leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+ peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+ peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+
+ // Previous leader should switch to Follower.
+
+ verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
+ state.getRaftState()));
+
+ // Send PeerUp to peer1 and peer2.
+
+ peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+ peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+ // The previous leader should become the owner of entity1.
+
+ verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
+ // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+ // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+ // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
+ // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
+ verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
+ or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
+ or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
+
+ verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+ verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+
+ // Verify entity2's owner doesn't change.
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+
+ verifyNoMoreInteractions(leaderListener);
+ verifyNoMoreInteractions(peer1Listener);
+ verifyNoMoreInteractions(peer2Listener);
+
+ testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
+ }
+
+ @Test
+ public void testListenerRegistration() {
+ testLog.info("testListenerRegistration starting");
+
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+ ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
+
+ TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+ newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
+ peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+ TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+ newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
+
+ ShardTestKit.waitUntilLeader(leader);
+
+ String otherEntityType = "otherEntityType";
+ final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+ final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
+ final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
+ final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);