Bug 6540: EOS - handle edge case with pruning pending owner change commits 38/45638/5
authorTom Pantelis <tpanteli@brocade.com>
Thu, 15 Sep 2016 06:14:51 +0000 (02:14 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 11 Oct 2016 15:14:42 +0000 (15:14 +0000)
The previous patch https://git.opendaylight.org/gerrit/#/c/45516/ added
pruning of pending owner change commits on leader change. However there's
one edge case which wouldn't work correctly where the leader successfully
commits a transaction to add a candidate but becomes isolated when it tries
to commit the transaction to set the candidate as the owner, assuming the new
candidate is the only candidate. When the partition is healed, the owner write
transaction will be pruned and dropped thus no onwer will be selected.

We could allow this owner write to be forwarded to the new leader since it
originated from a client candidate add request. However this could still be
problematic if, during isolation, the majority partition gets a candidate add
and commits an owner. After the partition heals the "old" owner write would be
forwarded and overwrite the previous owner. This wouldn't be catastrophic but
would incur an unnecessary owner change. I would rather keep consistent behavior
of dropping pending owner writes to a new leader.

Instead, the new leader can assign the previous leader as owner when the partition
heals. So in onPeerUp and onLeaderChange, I added code to search for all entities
with no owner and select and write an owner. Therefore when onPeerUp occurs for the
previous leader after isolation, if no other candidate was registered and became
owner, then the previous leader will be assigned as owner.

Change-Id: I213bc3ecd3d1f7ebd099702390de2277109f92c2
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java

index f7caf0f4a57ad683dbea5559ce4775136e865e2d..ea54146dc33744c97ad20f3b78376cf6cac62e29 100644 (file)
@@ -14,6 +14,8 @@ import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Predicate;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -85,11 +87,17 @@ public class MessageCollectorActor extends UntypedActor {
     }
 
     public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count) {
     }
 
     public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count) {
+        return expectMatching(actor, clazz, count, msg -> true);
+    }
+
+    public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count,
+            Predicate<T> matcher) {
         int timeout = 5000;
         List<T> messages = Collections.emptyList();
         for(int i = 0; i < timeout / 50; i++) {
             try {
                 messages = getAllMatching(actor, clazz);
         int timeout = 5000;
         List<T> messages = Collections.emptyList();
         for(int i = 0; i < timeout / 50; i++) {
             try {
                 messages = getAllMatching(actor, clazz);
+                Iterables.removeIf(messages, Predicates.not(matcher));
                 if(messages.size() >= count) {
                     return messages;
                 }
                 if(messages.size() >= count) {
                     return messages;
                 }
index 99ef94ef5039ba9e39a14afe7bf59f73af59b608..db71da63e438a2e1fdb46837ae62654c1906d2b4 100644 (file)
@@ -24,10 +24,15 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.pattern.Patterns;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import akka.pattern.Patterns;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -48,7 +53,6 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Un
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
@@ -78,7 +82,6 @@ class EntityOwnershipShard extends Shard {
     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
     private final EntityOwnershipListenerSupport listenerSupport;
     private final Set<MemberName> downPeerMemberNames = new HashSet<>();
     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
     private final EntityOwnershipListenerSupport listenerSupport;
     private final Set<MemberName> downPeerMemberNames = new HashSet<>();
-    private final Map<String, MemberName> peerIdToMemberNames = new HashMap<>();
     private final EntityOwnerSelectionStrategyConfig strategyConfig;
     private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
     private final EntityOwnershipStatistics entityOwnershipStatistics;
     private final EntityOwnerSelectionStrategyConfig strategyConfig;
     private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
     private final EntityOwnershipStatistics entityOwnershipStatistics;
@@ -96,11 +99,6 @@ class EntityOwnershipShard extends Shard {
         this.strategyConfig = builder.ownerSelectionStrategyConfig;
         this.entityOwnershipStatistics = new EntityOwnershipStatistics();
         this.entityOwnershipStatistics.init(getDataStore());
         this.strategyConfig = builder.ownerSelectionStrategyConfig;
         this.entityOwnershipStatistics = new EntityOwnershipStatistics();
         this.entityOwnershipStatistics.init(getDataStore());
-
-        for(String peerId: getRaftActorContext().getPeerIds()) {
-            ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
-            peerIdToMemberNames.put(peerId, shardId.getMemberName());
-        }
     }
 
     @Override
     }
 
     @Override
@@ -347,17 +345,26 @@ class EntityOwnershipShard extends Shard {
 
         if (isLeader) {
 
 
         if (isLeader) {
 
+            // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any,
+            // is most likely down however it's possible we haven't received the PeerDown message yet.
+            initializeDownPeerMemberNamesFromClusterState();
+
             // Clear all existing strategies so that they get re-created when we call createStrategy again
             // This allows the strategies to be re-initialized with existing statistics maintained by
             // EntityOwnershipStatistics
             strategyConfig.clearStrategies();
 
             // Re-assign owners for all members that are known to be down. In a cluster which has greater than
             // Clear all existing strategies so that they get re-created when we call createStrategy again
             // This allows the strategies to be re-initialized with existing statistics maintained by
             // EntityOwnershipStatistics
             strategyConfig.clearStrategies();
 
             // Re-assign owners for all members that are known to be down. In a cluster which has greater than
-            // 3 nodes it is possible for a some node beside the leader being down when the leadership transitions
-            // it makes sense to use this event to re-assign owners for those downed nodes
+            // 3 nodes it is possible for some node beside the leader being down when the leadership transitions
+            // it makes sense to use this event to re-assign owners for those downed nodes.
+            Set<String> ownedBy = new HashSet<>(downPeerMemberNames.size() + 1);
             for (MemberName downPeerName : downPeerMemberNames) {
             for (MemberName downPeerName : downPeerMemberNames) {
-                selectNewOwnerForEntitiesOwnedBy(downPeerName);
+                ownedBy.add(downPeerName.getName());
             }
             }
+
+            // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp.
+            ownedBy.add("");
+            selectNewOwnerForEntitiesOwnedBy(ownedBy);
         } else {
             // The leader changed - notify the coordinator to check if pending modifications need to be sent.
             // While onStateChanged also does this, this method handles the case where the shard hears from a
         } else {
             // The leader changed - notify the coordinator to check if pending modifications need to be sent.
             // While onStateChanged also does this, this method handles the case where the shard hears from a
@@ -368,6 +375,33 @@ class EntityOwnershipShard extends Shard {
         super.onLeaderChanged(oldLeader, newLeader);
     }
 
         super.onLeaderChanged(oldLeader, newLeader);
     }
 
+    private void initializeDownPeerMemberNamesFromClusterState() {
+        java.util.Optional<Cluster> cluster = getRaftActorContext().getCluster();
+        if(!cluster.isPresent()) {
+            return;
+        }
+
+        CurrentClusterState state = cluster.get().state();
+        Set<Member> unreachable = state.getUnreachable();
+
+        LOG.debug("{}: initializeDownPeerMemberNamesFromClusterState - current downPeerMemberNames: {}, unreachable: {}",
+                persistenceId(), downPeerMemberNames, unreachable);
+
+        downPeerMemberNames.clear();
+        for(Member m: unreachable) {
+            downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
+        }
+
+        for(Member m: state.getMembers()) {
+            if(m.status() != MemberStatus.up() && m.status() != MemberStatus.weaklyUp()) {
+                LOG.debug("{}: Adding down member with status {}", persistenceId(), m.status());
+                downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
+            }
+        }
+
+        LOG.debug("{}: new downPeerMemberNames: {}", persistenceId(), downPeerMemberNames);
+    }
+
     private void onCandidateRemoved(CandidateRemoved message) {
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
     private void onCandidateRemoved(CandidateRemoved message) {
         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
@@ -399,7 +433,7 @@ class EntityOwnershipShard extends Shard {
 
         // Available members is all the known peers - the number of peers that are down + self
         // So if there are 2 peers and 1 is down then availableMembers will be 2
 
         // Available members is all the known peers - the number of peers that are down + self
         // So if there are 2 peers and 1 is down then availableMembers will be 2
-        final int availableMembers = peerIdToMemberNames.size() - downPeerMemberNames.size() + 1;
+        final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1;
 
         LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
 
 
         LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
 
@@ -429,13 +463,13 @@ class EntityOwnershipShard extends Shard {
             // it will first remove all its candidates on startup. If another candidate is registered during the time
             // the peer is down, the new candidate will be selected as the new owner.
 
             // it will first remove all its candidates on startup. If another candidate is registered during the time
             // the peer is down, the new candidate will be selected as the new owner.
 
-            selectNewOwnerForEntitiesOwnedBy(downMemberName);
+            selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(downMemberName.getName()));
         }
     }
 
         }
     }
 
-    private void selectNewOwnerForEntitiesOwnedBy(MemberName downMemberName) {
+    private void selectNewOwnerForEntitiesOwnedBy(Set<String> ownedBy) {
         final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
         final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
-        searchForEntitiesOwnedBy(downMemberName.getName(), (entityTypeNode, entityNode) -> {
+        searchForEntitiesOwnedBy(ownedBy, (entityTypeNode, entityNode) -> {
             YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
                     node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
                     node(ENTITY_OWNER_NODE_ID).build();
             YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
                     node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
                     node(ENTITY_OWNER_NODE_ID).build();
@@ -460,13 +494,23 @@ class EntityOwnershipShard extends Shard {
     private void onPeerUp(PeerUp peerUp) {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
     private void onPeerUp(PeerUp peerUp) {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
-        peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
         downPeerMemberNames.remove(peerUp.getMemberName());
 
         // Notify the coordinator to check if pending modifications need to be sent. We do this here
         // to handle the case where the leader's peer address isn't known yet when a prior state or
         // leader change occurred.
         commitCoordinator.onStateChanged(this, isLeader());
         downPeerMemberNames.remove(peerUp.getMemberName());
 
         // Notify the coordinator to check if pending modifications need to be sent. We do this here
         // to handle the case where the leader's peer address isn't known yet when a prior state or
         // leader change occurred.
         commitCoordinator.onStateChanged(this, isLeader());
+
+        if(isLeader()) {
+            // Try to assign owners for entities that have no current owner. It's possible the peer that is now up
+            // had previously registered as a candidate and was the only candidate but the owner write tx couldn't be
+            // committed due to a leader change. Eg, the leader is able to successfully commit the candidate add tx but
+            // becomes isolated before it can commit the owner change and switches to follower. The majority partition
+            // with a new leader has the candidate but the entity has no owner. When the partition is healed and the
+            // previously isolated leader reconnects, we'll receive onPeerUp and, if there's still no owner, the
+            // previous leader will gain ownership.
+            selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(""));
+        }
     }
 
     private Collection<String> getCandidateNames(MapEntryNode entity) {
     }
 
     private Collection<String> getCandidateNames(MapEntryNode entity) {
@@ -479,13 +523,14 @@ class EntityOwnershipShard extends Shard {
         return candidateNames;
     }
 
         return candidateNames;
     }
 
-    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
-        LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+    private void searchForEntitiesOwnedBy(Set<String> ownedBy, EntityWalker walker) {
+        LOG.debug("{}: Searching for entities owned by {}", persistenceId(), ownedBy);
 
         searchForEntities((entityTypeNode, entityNode) -> {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
                     entityNode.getChild(ENTITY_OWNER_NODE_ID);
 
         searchForEntities((entityTypeNode, entityNode) -> {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
                     entityNode.getChild(ENTITY_OWNER_NODE_ID);
-            if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+            String currentOwner = possibleOwner.isPresent() ? possibleOwner.get().getValue().toString() : "";
+            if(ownedBy.contains(currentOwner)) {
                 walker.onEntity(entityTypeNode, entityNode);
             }
         });
                 walker.onEntity(entityTypeNode, entityNode);
             }
         });
index 23b5563a3297acc4d91f05d06cbd6237e49fa172..eb797eebd1afebe6143a094ba2e0b7f0130d7ba2 100644 (file)
@@ -246,6 +246,20 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
         });
     }
 
         });
     }
 
+    static void verifyNoOwnerSet(TestActorRef<? extends EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId) {
+        YangInstanceIdentifier entityPath = entityPath(entityType, entityId).node(ENTITY_OWNER_QNAME);
+        try {
+            NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, entityPath);
+            if(node != null) {
+                Assert.fail("Owner " + node.getValue() + " was set for " + entityPath);
+            }
+
+        } catch (Exception e) {
+            throw new AssertionError("read failed", e);
+        }
+    }
+
     static void verifyRaftState(final TestActorRef<? extends EntityOwnershipShard> shard, Consumer<OnDemandRaftState> verifier)
             throws Exception {
         AssertionError lastError = null;
     static void verifyRaftState(final TestActorRef<? extends EntityOwnershipShard> shard, Consumer<OnDemandRaftState> verifier)
             throws Exception {
         AssertionError lastError = null;
index 9660a2a39289890a74361f97b3d44a5cb9e2794d..c268e3eff59df6b81372822616dae5b2fa6cc63a 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.AdditionalMatchers.or;
 import static org.mockito.Matchers.any;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.AdditionalMatchers.or;
 import static org.mockito.Matchers.any;
@@ -17,6 +20,7 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -39,6 +43,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
@@ -57,7 +62,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
@@ -272,12 +276,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
+        expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
 
         // Send a bunch of registration messages quickly and verify.
 
         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
 
         // Send a bunch of registration messages quickly and verify.
 
         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
-        MessageCollectorActor.clearMessages(leaderShard.collectorActor());
+        clearMessages(leaderShard.collectorActor());
 
         int max = 100;
         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
 
         int max = 100;
         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
@@ -835,6 +839,189 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         testLog.info("testLeaderIsolation ending");
     }
 
         testLog.info("testLeaderIsolation ending");
     }
 
+    @Test
+    public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
+        testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
+
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+        ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
+            shardIsolatedLeaderCheckIntervalInMillis(100000);
+
+        TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
+                actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString());
+        peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
+                actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
+        peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+                shardIsolatedLeaderCheckIntervalInMillis(500);
+
+        TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
+                actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
+
+        ShardTestKit.waitUntilLeader(leader);
+
+        // Add listeners on all members
+
+        DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
+                "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
+        leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
+                "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
+        peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
+                "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
+        peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        // Drop the CandidateAdded message to the leader for now.
+
+        leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
+
+        // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
+        // assigned the owner.
+
+        DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+        leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
+        leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        // Capture the CandidateAdded messages.
+
+        List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2);
+
+        // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
+        // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
+
+        peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+        peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+
+        // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
+
+        leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
+        leader.tell(candidateAdded.get(0), leader);
+        leader.tell(candidateAdded.get(1), leader);
+
+        expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0);
+
+        // Verify no owner assigned.
+
+        verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
+        verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
+
+        // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+        leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
+        leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+        peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
+                ae -> ae.getLeaderId().equals(leaderId.toString()));
+        peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+        // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
+
+        leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
+        leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+
+        // Verify the leader transitions to IsolatedLeader.
+
+        verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
+
+        // Send PeerDown to the new leader peer1.
+
+        peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+        // Make peer1 start an election and become leader by sending the TimeoutNow message.
+
+        peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+        // Verify the peer1 transitions to Leader.
+
+        verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
+
+        verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
+        verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
+
+        verifyNoMoreInteractions(peer1Listener);
+        verifyNoMoreInteractions(peer2Listener);
+
+        // Add candidate peer1 candidate for entity2.
+
+        peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+
+        verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+        verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
+        verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
+
+        reset(leaderListener, peer1Listener, peer2Listener);
+
+        // Remove the isolation.
+
+        leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
+        leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+        peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+        peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+
+        // Previous leader should switch to Follower.
+
+        verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
+
+        // Send PeerUp to peer1 and peer2.
+
+        peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+        peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+        // The previous leader should become the owner of entity1.
+
+        verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
+        //     - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+        //     - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+        //     - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
+        //     - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
+        verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false),
+                ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true),
+                        ownershipChange(entity2, false, false, true))));
+
+        verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+        verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+
+        // Verify entity2's owner doesn't change.
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+
+        verifyNoMoreInteractions(leaderListener);
+        verifyNoMoreInteractions(peer1Listener);
+        verifyNoMoreInteractions(peer2Listener);
+
+        testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
+    }
+
     @Test
     public void testListenerRegistration() throws Exception {
         testLog.info("testListenerRegistration starting");
     @Test
     public void testListenerRegistration() throws Exception {
         testLog.info("testListenerRegistration starting");
@@ -1063,14 +1250,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
         public void handleCommand(Object message) {
         @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
         public void handleCommand(Object message) {
-            if(collectorActor != null) {
-                collectorActor.tell(message, ActorRef.noSender());
-            }
-
             Predicate drop = dropMessagesOfType.get(message.getClass());
             if(drop == null || !drop.test(message)) {
                 super.handleCommand(message);
             }
             Predicate drop = dropMessagesOfType.get(message.getClass());
             if(drop == null || !drop.test(message)) {
                 super.handleCommand(message);
             }
+
+            if(collectorActor != null) {
+                collectorActor.tell(message, ActorRef.noSender());
+            }
         }
 
         void startDroppingMessagesOfType(Class<?> msgClass) {
         }
 
         void startDroppingMessagesOfType(Class<?> msgClass) {