From 0fab6c716548e89938c1a8493dc25991c006aa10 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 15 Sep 2016 02:14:51 -0400 Subject: [PATCH] Bug 6540: EOS - handle edge case with pruning pending owner change commits The previous patch https://git.opendaylight.org/gerrit/#/c/45516/ added pruning of pending owner change commits on leader change. However there's one edge case which wouldn't work correctly where the leader successfully commits a transaction to add a candidate but becomes isolated when it tries to commit the transaction to set the candidate as the owner, assuming the new candidate is the only candidate. When the partition is healed, the owner write transaction will be pruned and dropped thus no onwer will be selected. We could allow this owner write to be forwarded to the new leader since it originated from a client candidate add request. However this could still be problematic if, during isolation, the majority partition gets a candidate add and commits an owner. After the partition heals the "old" owner write would be forwarded and overwrite the previous owner. This wouldn't be catastrophic but would incur an unnecessary owner change. I would rather keep consistent behavior of dropping pending owner writes to a new leader. Instead, the new leader can assign the previous leader as owner when the partition heals. So in onPeerUp and onLeaderChange, I added code to search for all entities with no owner and select and write an owner. Therefore when onPeerUp occurs for the previous leader after isolation, if no other candidate was registered and became owner, then the previous leader will be assigned as owner. Change-Id: I213bc3ecd3d1f7ebd099702390de2277109f92c2 Signed-off-by: Tom Pantelis --- .../raft/utils/MessageCollectorActor.java | 8 + .../entityownership/EntityOwnershipShard.java | 81 +++++-- .../AbstractEntityOwnershipTest.java | 14 ++ .../EntityOwnershipShardTest.java | 201 +++++++++++++++++- 4 files changed, 279 insertions(+), 25 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index f7caf0f4a5..ea54146dc3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -14,6 +14,8 @@ import akka.actor.UntypedActor; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; @@ -85,11 +87,17 @@ public class MessageCollectorActor extends UntypedActor { } public static List expectMatching(ActorRef actor, Class clazz, int count) { + return expectMatching(actor, clazz, count, msg -> true); + } + + public static List expectMatching(ActorRef actor, Class clazz, int count, + Predicate matcher) { int timeout = 5000; List messages = Collections.emptyList(); for(int i = 0; i < timeout / 50; i++) { try { messages = getAllMatching(actor, clazz); + Iterables.removeIf(messages, Predicates.not(matcher)); if(messages.size() >= count) { return messages; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 99ef94ef50..db71da63e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -24,10 +24,15 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import akka.cluster.Cluster; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.ClusterEvent.CurrentClusterState; import akka.pattern.Patterns; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -48,7 +53,6 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Un import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.PeerDown; import org.opendaylight.controller.cluster.datastore.messages.PeerUp; @@ -78,7 +82,6 @@ class EntityOwnershipShard extends Shard { private final EntityOwnershipShardCommitCoordinator commitCoordinator; private final EntityOwnershipListenerSupport listenerSupport; private final Set downPeerMemberNames = new HashSet<>(); - private final Map peerIdToMemberNames = new HashMap<>(); private final EntityOwnerSelectionStrategyConfig strategyConfig; private final Map entityToScheduledOwnershipTask = new HashMap<>(); private final EntityOwnershipStatistics entityOwnershipStatistics; @@ -96,11 +99,6 @@ class EntityOwnershipShard extends Shard { this.strategyConfig = builder.ownerSelectionStrategyConfig; this.entityOwnershipStatistics = new EntityOwnershipStatistics(); this.entityOwnershipStatistics.init(getDataStore()); - - for(String peerId: getRaftActorContext().getPeerIds()) { - ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId); - peerIdToMemberNames.put(peerId, shardId.getMemberName()); - } } @Override @@ -347,17 +345,26 @@ class EntityOwnershipShard extends Shard { if (isLeader) { + // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any, + // is most likely down however it's possible we haven't received the PeerDown message yet. + initializeDownPeerMemberNamesFromClusterState(); + // Clear all existing strategies so that they get re-created when we call createStrategy again // This allows the strategies to be re-initialized with existing statistics maintained by // EntityOwnershipStatistics strategyConfig.clearStrategies(); // Re-assign owners for all members that are known to be down. In a cluster which has greater than - // 3 nodes it is possible for a some node beside the leader being down when the leadership transitions - // it makes sense to use this event to re-assign owners for those downed nodes + // 3 nodes it is possible for some node beside the leader being down when the leadership transitions + // it makes sense to use this event to re-assign owners for those downed nodes. + Set ownedBy = new HashSet<>(downPeerMemberNames.size() + 1); for (MemberName downPeerName : downPeerMemberNames) { - selectNewOwnerForEntitiesOwnedBy(downPeerName); + ownedBy.add(downPeerName.getName()); } + + // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp. + ownedBy.add(""); + selectNewOwnerForEntitiesOwnedBy(ownedBy); } else { // The leader changed - notify the coordinator to check if pending modifications need to be sent. // While onStateChanged also does this, this method handles the case where the shard hears from a @@ -368,6 +375,33 @@ class EntityOwnershipShard extends Shard { super.onLeaderChanged(oldLeader, newLeader); } + private void initializeDownPeerMemberNamesFromClusterState() { + java.util.Optional cluster = getRaftActorContext().getCluster(); + if(!cluster.isPresent()) { + return; + } + + CurrentClusterState state = cluster.get().state(); + Set unreachable = state.getUnreachable(); + + LOG.debug("{}: initializeDownPeerMemberNamesFromClusterState - current downPeerMemberNames: {}, unreachable: {}", + persistenceId(), downPeerMemberNames, unreachable); + + downPeerMemberNames.clear(); + for(Member m: unreachable) { + downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next())); + } + + for(Member m: state.getMembers()) { + if(m.status() != MemberStatus.up() && m.status() != MemberStatus.weaklyUp()) { + LOG.debug("{}: Adding down member with status {}", persistenceId(), m.status()); + downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next())); + } + } + + LOG.debug("{}: new downPeerMemberNames: {}", persistenceId(), downPeerMemberNames); + } + private void onCandidateRemoved(CandidateRemoved message) { LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message); @@ -399,7 +433,7 @@ class EntityOwnershipShard extends Shard { // Available members is all the known peers - the number of peers that are down + self // So if there are 2 peers and 1 is down then availableMembers will be 2 - final int availableMembers = peerIdToMemberNames.size() - downPeerMemberNames.size() + 1; + final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1; LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner); @@ -429,13 +463,13 @@ class EntityOwnershipShard extends Shard { // it will first remove all its candidates on startup. If another candidate is registered during the time // the peer is down, the new candidate will be selected as the new owner. - selectNewOwnerForEntitiesOwnedBy(downMemberName); + selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(downMemberName.getName())); } } - private void selectNewOwnerForEntitiesOwnedBy(MemberName downMemberName) { + private void selectNewOwnerForEntitiesOwnedBy(Set ownedBy) { final BatchedModifications modifications = commitCoordinator.newBatchedModifications(); - searchForEntitiesOwnedBy(downMemberName.getName(), (entityTypeNode, entityNode) -> { + searchForEntitiesOwnedBy(ownedBy, (entityTypeNode, entityNode) -> { YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH). node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()). node(ENTITY_OWNER_NODE_ID).build(); @@ -460,13 +494,23 @@ class EntityOwnershipShard extends Shard { private void onPeerUp(PeerUp peerUp) { LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp); - peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName()); downPeerMemberNames.remove(peerUp.getMemberName()); // Notify the coordinator to check if pending modifications need to be sent. We do this here // to handle the case where the leader's peer address isn't known yet when a prior state or // leader change occurred. commitCoordinator.onStateChanged(this, isLeader()); + + if(isLeader()) { + // Try to assign owners for entities that have no current owner. It's possible the peer that is now up + // had previously registered as a candidate and was the only candidate but the owner write tx couldn't be + // committed due to a leader change. Eg, the leader is able to successfully commit the candidate add tx but + // becomes isolated before it can commit the owner change and switches to follower. The majority partition + // with a new leader has the candidate but the entity has no owner. When the partition is healed and the + // previously isolated leader reconnects, we'll receive onPeerUp and, if there's still no owner, the + // previous leader will gain ownership. + selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of("")); + } } private Collection getCandidateNames(MapEntryNode entity) { @@ -479,13 +523,14 @@ class EntityOwnershipShard extends Shard { return candidateNames; } - private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) { - LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner); + private void searchForEntitiesOwnedBy(Set ownedBy, EntityWalker walker) { + LOG.debug("{}: Searching for entities owned by {}", persistenceId(), ownedBy); searchForEntities((entityTypeNode, entityNode) -> { Optional> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID); - if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) { + String currentOwner = possibleOwner.isPresent() ? possibleOwner.get().getValue().toString() : ""; + if(ownedBy.contains(currentOwner)) { walker.onEntity(entityTypeNode, entityNode); } }); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java index 23b5563a32..eb797eebd1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java @@ -246,6 +246,20 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest { }); } + static void verifyNoOwnerSet(TestActorRef shard, String entityType, + YangInstanceIdentifier entityId) { + YangInstanceIdentifier entityPath = entityPath(entityType, entityId).node(ENTITY_OWNER_QNAME); + try { + NormalizedNode node = AbstractShardTest.readStore(shard, entityPath); + if(node != null) { + Assert.fail("Owner " + node.getValue() + " was set for " + entityPath); + } + + } catch (Exception e) { + throw new AssertionError("read failed", e); + } + } + static void verifyRaftState(final TestActorRef shard, Consumer verifier) throws Exception { AssertionError lastError = null; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 9660a2a392..c268e3eff5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -7,6 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.junit.Assert.assertEquals; import static org.mockito.AdditionalMatchers.or; import static org.mockito.Matchers.any; @@ -17,6 +20,7 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -39,6 +43,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.ShardTestKit; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; @@ -57,7 +62,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; -import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; @@ -272,12 +276,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); + expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); // Send a bunch of registration messages quickly and verify. leaderShard.stopDroppingMessagesOfType(BatchedModifications.class); - MessageCollectorActor.clearMessages(leaderShard.collectorActor()); + clearMessages(leaderShard.collectorActor()); int max = 100; List entityIds = new ArrayList<>(); @@ -835,6 +839,189 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { testLog.info("testLeaderIsolation ending"); } + @Test + public void testLeaderIsolationWithPendingCandidateAdded() throws Exception { + testLog.info("testLeaderIsolationWithPendingCandidateAdded starting"); + + ShardTestKit kit = new ShardTestKit(getSystem()); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). + shardIsolatedLeaderCheckIntervalInMillis(100000); + + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()). + shardIsolatedLeaderCheckIntervalInMillis(500); + + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); + + // Add listeners on all members + + DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME); + leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME); + peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME); + peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Drop the CandidateAdded message to the leader for now. + + leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class); + + // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be + // assigned the owner. + + DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + + // Capture the CandidateAdded messages. + + List candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2); + + // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we + // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted. + + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + + // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries. + + leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class); + leader.tell(candidateAdded.get(0), leader); + leader.tell(candidateAdded.get(1), leader); + + expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0); + + // Verify no owner assigned. + + verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier()); + + // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. + + leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, + ae -> ae.getLeaderId().equals(leaderId.toString())); + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + // Send PeerDown to the isolated leader - should be no-op since there's no owned entities. + + leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); + + // Verify the leader transitions to IsolatedLeader. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState())); + + // Send PeerDown to the new leader peer1. + + peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // Make peer1 start an election and become leader by sending the TimeoutNow message. + + peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + + // Verify the peer1 transitions to Leader. + + verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); + + verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier()); + + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + // Add candidate peer1 candidate for entity2. + + peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true)); + + reset(leaderListener, peer1Listener, peer2Listener); + + // Remove the isolation. + + leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + + // Previous leader should switch to Follower. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState())); + + // Send PeerUp to peer1 and peer2. + + peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // The previous leader should become the owner of entity1. + + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + // The previous leader's DOMEntityOwnershipListener should get 4 total notifications: + // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false) + // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false) + verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false), + ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true), + ownershipChange(entity2, false, false, true)))); + + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + + // Verify entity2's owner doesn't change. + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + + verifyNoMoreInteractions(leaderListener); + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + testLog.info("testLeaderIsolationWithPendingCandidateAdded ending"); + } + @Test public void testListenerRegistration() throws Exception { testLog.info("testListenerRegistration starting"); @@ -1063,14 +1250,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void handleCommand(Object message) { - if(collectorActor != null) { - collectorActor.tell(message, ActorRef.noSender()); - } - Predicate drop = dropMessagesOfType.get(message.getClass()); if(drop == null || !drop.test(message)) { super.handleCommand(message); } + + if(collectorActor != null) { + collectorActor.tell(message, ActorRef.noSender()); + } } void startDroppingMessagesOfType(Class msgClass) { -- 2.36.6