Bug 6540: EOS - Prune pending owner change commits on leader change 16/45516/4
authorTom Pantelis <tpanteli@brocade.com>
Thu, 8 Sep 2016 14:16:53 +0000 (10:16 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 26 Sep 2016 03:43:24 +0000 (23:43 -0400)
When the shard leader is isolated, it attempts to re-assign ownership for down
peers. However, since it's isolated, it can't commit the modifications. If the
majority partition elects a new leader, when the partition is healed, the old
leader tries to forward the pending owner change commits to the new leader.
However this is problematic as the criteria used to determine the new owner is
stale and owner changes should only be committed by a valid leader. Since the
old leader is no longer the leader, it should not forward pending owner change
commits. However it still should forward local candidate change commits.

So I modified EntityOwnershipShardCommitCoordinator#onStateChange to iterate
the pending Modifications and remove WRITE modifications for the owner leaf
when the shard has transitioned to having a remote leader.

I also fixed an issue in EntityOwnershipShard#onCandidateRemoved that was
intermittently revealed by unit tests. Say candidate1 and candidate2 are
removed quickly for an entity and candidate1 is the current owner.
onCandidateRemoved is called for candidate1 and commits an update to write
candidate2 as the owner. If the write commit is still pending when
onCandidateRemoved is called for candidate2, the current owner will still
be candidate1 and the "message.getRemovedCandidate().equals(currentOwner)"
check will fail and thus the owner isn't cleared and candidate2 will remain
as owner. This results in a node being the owner w/o being in the candidate
list. (This patch may fix Bug 6672 as well)

A new testLeaderIsolation case was added to EntityOwnershipShardTest. Also I
reworked the tests and removed the use of the MockFollower and MockLeader
actors for consistency and also so the tests use the real EOS shard.

Change-Id: I5039b07d02f8571ee2d1affb0f364ea278641e91
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java

index c3f76530bccfd8924fb245adb70216de6e5a7e5b..a1e506f6ecba7dca921a6df5d7f871237ca25850 100644 (file)
@@ -610,8 +610,7 @@ public class Shard extends RaftActor {
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
             if (leader != null) {
-                Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
-                            datastoreContext.getShardBatchedModificationCount());
+                Collection<?> messagesToForward = convertPendingTransactionsToMessages();
 
                 if (!messagesToForward.isEmpty()) {
                     LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
@@ -633,6 +632,15 @@ public class Shard extends RaftActor {
         }
     }
 
+    /**
+     * Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+     *
+     * @return the converted messages
+     */
+    public Collection<?> convertPendingTransactionsToMessages() {
+        return commitCoordinator.convertPendingTransactionsToMessages(datastoreContext.getShardBatchedModificationCount());
+    }
+
     @Override
     protected void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
index f1e898ae23042d9da345032d156ea54cfd7b504b..99ef94ef5039ba9e39a14afe7bf59f73af59b608 100644 (file)
@@ -322,8 +322,6 @@ class EntityOwnershipShard extends Shard {
 
     @Override
     protected void onStateChanged() {
-        super.onStateChanged();
-
         boolean isLeader = isLeader();
         LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());
 
@@ -337,12 +335,12 @@ class EntityOwnershipShard extends Shard {
         }
 
         commitCoordinator.onStateChanged(this, isLeader);
+
+        super.onStateChanged();
     }
 
     @Override
     protected void onLeaderChanged(String oldLeader, String newLeader) {
-        super.onLeaderChanged(oldLeader, newLeader);
-
         boolean isLeader = isLeader();
         LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
                 newLeader, isLeader);
@@ -366,6 +364,8 @@ class EntityOwnershipShard extends Shard {
             // leader and stays in the follower state. In that case no behavior state change occurs.
             commitCoordinator.onStateChanged(this, isLeader);
         }
+
+        super.onLeaderChanged(oldLeader, newLeader);
     }
 
     private void onCandidateRemoved(CandidateRemoved message) {
@@ -373,10 +373,8 @@ class EntityOwnershipShard extends Shard {
 
         if(isLeader()) {
             String currentOwner = getCurrentOwner(message.getEntityPath());
-            if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().isEmpty()){
-                writeNewOwner(message.getEntityPath(),
-                        newOwner(currentOwner, message.getRemainingCandidates(), getEntityOwnerElectionStrategy(message.getEntityPath())));
-            }
+            writeNewOwner(message.getEntityPath(),
+                    newOwner(currentOwner, message.getRemainingCandidates(), getEntityOwnerElectionStrategy(message.getEntityPath())));
         }
     }
 
index 56ecd52276d9638cf30d68c220c67ebd58f42095..c792cf1dda996370b52002fa952b5689088d73df 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
+
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
 import akka.actor.Status.Failure;
@@ -14,6 +16,7 @@ import com.google.common.base.Preconditions;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
@@ -25,6 +28,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -181,6 +185,8 @@ class EntityOwnershipShardCommitCoordinator {
     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
         shard.possiblyRemoveAllInitialCandidates(shard.getLeader());
 
+        possiblyPrunePendingCommits(shard, isLeader);
+
         if(!isLeader && inflightCommit != null) {
             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
             // consensus for the commit and switched to follower due to another node with a higher term. We
@@ -197,6 +203,66 @@ class EntityOwnershipShardCommitCoordinator {
         }
     }
 
+    private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) {
+        // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader.
+        // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the
+        // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not
+        // forward such commits thus we prune the pending modifications. We still should forward local candidate change
+        // commits.
+        if (shard.hasLeader() && !isLeader) {
+            // We may have already submitted a transaction for replication and commit. We don't need the base Shard to
+            // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear
+            // pending transactions and drop them.
+            shard.convertPendingTransactionsToMessages();
+
+            // Prune the inflightCommit.
+            if(inflightCommit != null) {
+                inflightCommit = pruneModifications(inflightCommit);
+            }
+
+            // Prune the subsequent pending modifications.
+            Iterator<Modification> iter = pendingModifications.iterator();
+            while(iter.hasNext()) {
+                Modification mod = iter.next();
+                if(!canForwardModificationToNewLeader(mod)) {
+                    iter.remove();
+                }
+            }
+        }
+    }
+
+    @Nullable
+    private BatchedModifications pruneModifications(BatchedModifications toPrune) {
+        BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionID(), toPrune.getVersion());
+        prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
+        prunedModifications.setReady(toPrune.isReady());
+        prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
+        for(Modification mod: toPrune.getModifications()) {
+            if(canForwardModificationToNewLeader(mod)) {
+                prunedModifications.addModification(mod);
+            }
+        }
+
+        return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null;
+    }
+
+    private boolean canForwardModificationToNewLeader(Modification mod) {
+        // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used
+        // to determine the new owner might be stale.
+        if (mod instanceof WriteModification) {
+            WriteModification writeMod = (WriteModification)mod;
+            boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME);
+
+            if (!canForward) {
+                log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath());
+            }
+
+            return canForward;
+        }
+
+        return true;
+    }
+
     private void newInflightCommitWithDifferentTransactionID() {
         BatchedModifications newBatchedModifications = newBatchedModifications();
         newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
index f93ac28eeb6e0dae031461f370ca84df1c93e18f..23b5563a3297acc4d91f05d06cbd6237e49fa172 100644 (file)
@@ -17,18 +17,30 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
+
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import org.hamcrest.Description;
 import org.junit.Assert;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Matchers;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
@@ -48,6 +60,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Abstract base class providing utility methods.
@@ -55,6 +73,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
  * @author Thomas Pantelis
  */
 public class AbstractEntityOwnershipTest extends AbstractActorTest {
+    protected final Logger testLog = LoggerFactory.getLogger(getClass());
+
+    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
+
     protected void verifyEntityCandidate(NormalizedNode<?, ?> node, String entityType,
             YangInstanceIdentifier entityId, String candidateName, boolean expectPresent) {
         try {
@@ -175,18 +197,24 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
 
     static DOMEntityOwnershipChange ownershipChange(final DOMEntity expEntity, final boolean expWasOwner,
             final boolean expIsOwner, final boolean expHasOwner) {
+        return ownershipChange(expEntity, expWasOwner, expIsOwner, expHasOwner, false);
+    }
+
+    static DOMEntityOwnershipChange ownershipChange(final DOMEntity expEntity, final boolean expWasOwner,
+            final boolean expIsOwner, final boolean expHasOwner, final boolean expInJeopardy) {
         return Matchers.argThat(new ArgumentMatcher<DOMEntityOwnershipChange>() {
             @Override
             public boolean matches(Object argument) {
                 DOMEntityOwnershipChange change = (DOMEntityOwnershipChange) argument;
                 return expEntity.equals(change.getEntity()) && expWasOwner == change.getState().wasOwner() &&
-                        expIsOwner == change.getState().isOwner() && expHasOwner == change.getState().hasOwner();
+                        expIsOwner == change.getState().isOwner() && expHasOwner == change.getState().hasOwner() &&
+                        expInJeopardy == change.inJeopardy();
             }
 
             @Override
             public void describeTo(Description description) {
                 description.appendValue(new DOMEntityOwnershipChange(expEntity, EntityOwnershipChangeState.from(
-                        expWasOwner, expIsOwner, expHasOwner)));
+                        expWasOwner, expIsOwner, expHasOwner), expInJeopardy));
             }
         });
     }
@@ -206,4 +234,74 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
             }
         });
     }
+
+    static void verifyOwner(final TestActorRef<? extends EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId, String localMemberName) {
+        verifyOwner(localMemberName, entityType, entityId, path -> {
+            try {
+                return AbstractShardTest.readStore(shard, path);
+            } catch(Exception e) {
+                return null;
+            }
+        });
+    }
+
+    static void verifyRaftState(final TestActorRef<? extends EntityOwnershipShard> shard, Consumer<OnDemandRaftState> verifier)
+            throws Exception {
+        AssertionError lastError = null;
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+            Future<Object> future = Patterns.ask(shard, GetOnDemandRaftState.INSTANCE, new Timeout(operationDuration));
+            OnDemandRaftState raftState = (OnDemandRaftState)Await.result(future, operationDuration);
+            try {
+                verifier.accept(raftState);
+                return;
+            } catch (AssertionError e) {
+                lastError = e;
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        throw lastError;
+    }
+
+    static ShardIdentifier newShardId(String memberName) {
+        return ShardIdentifier.create("entity-ownership", MemberName.forName(memberName),
+            "operational" + NEXT_SHARD_NUM.getAndIncrement());
+    }
+
+    void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) {
+        verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
+                path -> {
+                    try {
+                        return AbstractShardTest.readStore(shard, path);
+                    } catch(Exception e) {
+                        throw new AssertionError("Failed to read " + path, e);
+                    }
+            });
+    }
+
+    void verifyCommittedEntityCandidate(final TestActorRef<? extends EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) {
+        verifyEntityCandidate(entityType, entityId, candidateName, path -> {
+            try {
+                return AbstractShardTest.readStore(shard, path);
+            } catch(Exception e) {
+                throw new AssertionError("Failed to read " + path, e);
+            }
+        });
+    }
+
+    void verifyNoEntityCandidate(final TestActorRef<? extends EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) {
+        verifyEntityCandidate(entityType, entityId, candidateName, path -> {
+            try {
+                return AbstractShardTest.readStore(shard, path);
+            } catch(Exception e) {
+                throw new AssertionError("Failed to read " + path, e);
+            }
+        }, false);
+    }
 }
index 13fb8dc0322ff04c9a19382e226e2d70b0a4a93c..9660a2a39289890a74361f97b3d44a5cb9e2794d 100644 (file)
@@ -16,14 +16,11 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
-import akka.actor.UntypedActor;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
@@ -33,17 +30,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
@@ -53,28 +47,23 @@ import org.opendaylight.controller.cluster.datastore.entityownership.selectionst
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -95,10 +84,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
     private static final YangInstanceIdentifier ENTITY_ID5 =
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
-    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
-    private static final String LOCAL_MEMBER_NAME = "member-1";
+    private static final String LOCAL_MEMBER_NAME = "local-member-1";
+    private static final String PEER_MEMBER_1_NAME = "peer-member-1";
+    private static final String PEER_MEMBER_2_NAME = "peer-member-2";
 
-    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
+    private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
     @After
@@ -108,9 +98,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
     @Test
     public void testOnRegisterCandidateLocal() throws Exception {
+        testLog.info("testOnRegisterCandidateLocal starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
 
         ShardTestKit.waitUntilLeader(shard);
 
@@ -122,21 +114,29 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        testLog.info("testOnRegisterCandidateLocal ending");
     }
 
     @Test
     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
+        testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
 
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
 
-        String peerId = newShardId("follower").toString();
-        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
+
+        TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
+        TestEntityOwnershipShard peerShard = peer.underlyingActor();
+        peerShard.startDroppingMessagesOfType(RequestVote.class);
+        peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
-                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
-                withDispatcher(Dispatchers.DefaultDispatcherId()));
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
 
         YangInstanceIdentifier entityId = ENTITY_ID1;
         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
@@ -144,39 +144,44 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        // Now grant the vote so the shard becomes the leader. This should retry the commit.
-        peer.underlyingActor().grantVote = true;
+        // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
+        peerShard.stopDroppingMessagesOfType(RequestVote.class);
 
         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
     }
 
     @Test
     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
+        testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
 
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
                 shardTransactionCommitTimeoutInSeconds(1);
 
-        String peerId = newShardId("follower").toString();
-        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
 
-        MockFollower follower = peer.underlyingActor();
+        TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
+        TestEntityOwnershipShard peerShard = peer.underlyingActor();
+        peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
 
         // Drop AppendEntries so consensus isn't reached.
-        follower.dropAppendEntries = true;
+        peerShard.startDroppingMessagesOfType(AppendEntries.class);
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
-                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
-                withDispatcher(Dispatchers.DefaultDispatcherId()));
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
 
-        ShardTestKit.waitUntilLeader(shard);
+        ShardTestKit.waitUntilLeader(leader);
 
         YangInstanceIdentifier entityId = ENTITY_ID1;
         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
 
-        shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         // Wait enough time for the commit to timeout.
@@ -184,121 +189,117 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
         // write being applied to the state.
-        follower.dropAppendEntries = false;
+        peerShard.stopDroppingMessagesOfType(AppendEntries.class);
 
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-        verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
     }
 
     @Test
     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
+        testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
 
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
                 shardIsolatedLeaderCheckIntervalInMillis(50);
 
-        String peerId = newShardId("follower").toString();
-        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
 
-        MockFollower follower = peer.underlyingActor();
+        TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
+        TestEntityOwnershipShard peerShard = peer.underlyingActor();
+        peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
-                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
-                withDispatcher(Dispatchers.DefaultDispatcherId()));
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
 
-        ShardTestKit.waitUntilLeader(shard);
+        ShardTestKit.waitUntilLeader(leader);
 
         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
-        follower.dropAppendEntries = true;
-        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        peerShard.startDroppingMessagesOfType(AppendEntries.class);
+        verifyRaftState(leader, state ->
+                assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
 
         YangInstanceIdentifier entityId = ENTITY_ID1;
         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
 
-        shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         // Resume AppendEntries - the candidate write should now be committed.
-        follower.dropAppendEntries = false;
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-        verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+        peerShard.stopDroppingMessagesOfType(AppendEntries.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+        testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
     }
 
     @Test
     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
+        testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
 
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
                 shardBatchedModificationCount(5);
 
-        String peerId = newShardId("leader").toString();
-        TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
-
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
-                TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME),
-                        ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build(),
-                        dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
+        ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
+        ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
+        TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
+                actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
+        TestEntityOwnershipShard leaderShard = leader.underlyingActor();
 
-        shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
-                DataStoreVersions.CURRENT_VERSION), peer);
+        TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
+        local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
 
-        shard.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+        local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        MockLeader leader = peer.underlyingActor();
-        assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
-                leader.modificationsReceived, 5, TimeUnit.SECONDS));
-        verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
-                LOCAL_MEMBER_NAME);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
         // Test with initial commit timeout and subsequent retry.
 
-        leader.modificationsReceived = new CountDownLatch(1);
-        leader.sendReply = false;
+        local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
+        leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
 
-        shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
-
-        shard.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+        local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
-                leader.modificationsReceived, 5, TimeUnit.SECONDS));
-        verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
-                LOCAL_MEMBER_NAME);
+        MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
 
         // Send a bunch of registration messages quickly and verify.
 
+        leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
+        MessageCollectorActor.clearMessages(leaderShard.collectorActor());
+
         int max = 100;
-        leader.delay = 4;
-        leader.modificationsReceived = new CountDownLatch(max);
         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
         for(int i = 1; i <= max; i++) {
             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
             entityIds.add(id);
-            shard.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
+            local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
         }
 
-        assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
-                leader.modificationsReceived, 10, TimeUnit.SECONDS));
-
-        // Sleep a little to ensure no additional BatchedModifications are received.
-
-        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-
-        List<Modification> receivedMods = leader.getAndClearReceivedModifications();
         for(int i = 0; i < max; i++) {
-            verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
+            verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
         }
 
-        assertEquals("# modifications received", max, receivedMods.size());
+        testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
     }
 
     @Test
     public void testOnUnregisterCandidateLocal() throws Exception {
+        testLog.info("testOnUnregisterCandidateLocal starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
         ShardTestKit.waitUntilLeader(shard);
 
         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
@@ -325,104 +326,132 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        testLog.info("testOnUnregisterCandidateLocal ending");
     }
 
     @Test
     public void testOwnershipChanges() throws Exception {
+        testLog.info("testOwnershipChanges starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
-        ShardTestKit.waitUntilLeader(shard);
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+        ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
+
+        TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
+                    peerId1.toString());
+        peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
+                    peerId2.toString());
+        peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
+                    leaderId.toString());
+
+        ShardTestKit.waitUntilLeader(leader);
 
         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
-        ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
 
         // Add a remote candidate
 
-        String remoteMemberName1 = "remoteMember1";
-        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+        peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
 
         // Register local
 
-        shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         // Verify the remote candidate becomes owner
 
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
 
         // Add another remote candidate and verify ownership doesn't change
 
-        String remoteMemberName2 = "remoteMember2";
-        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+        peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
 
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
 
         // Remove the second remote candidate and verify ownership doesn't change
 
-        deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+        peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
 
-        verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+        verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
 
         // Remove the first remote candidate and verify the local candidate becomes owner
 
-        deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+        peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
 
-        verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
 
         // Add the second remote candidate back and verify ownership doesn't change
 
-        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+        peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
 
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
 
         // Unregister the local candidate and verify the second remote candidate becomes owner
 
-        shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
+        leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+        verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
+
+        testLog.info("testOwnershipChanges ending");
     }
 
     @Test
     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
-        ShardTestKit kit = new ShardTestKit(getSystem());
+        testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
 
-        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000);
+        ShardTestKit kit = new ShardTestKit(getSystem());
 
-        String peerMemberName1 = "peerMember1";
-        String peerMemberName2 = "peerMember2";
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
+                shardIsolatedLeaderCheckIntervalInMillis(100000);
 
         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
-        ShardIdentifier peerId1 = newShardId(peerMemberName1);
-        ShardIdentifier peerId2 = newShardId(peerMemberName2);
+        ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+        ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
 
-        TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), actorFactory.createTestActorPath(leaderId.toString())).
-                        put(peerId2.toString(), actorFactory.createTestActorPath(peerId2.toString())).build(), peerMemberName1,
-                EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
+        TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
+                    peerId1.toString());
+        peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
 
-        TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), actorFactory.createTestActorPath(leaderId.toString())).
-                        put(peerId1.toString(), peer1.path().toString()).build(), peerMemberName2,
-                EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
+        TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
+                    peerId2.toString());
+        peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
 
-        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
-                ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
-                        put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
-        leader.tell(TimeoutNow.INSTANCE, leader);
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
+                    leaderId.toString());
 
-        ShardTestKit.waitUntilLeader(leader);
+        verifyRaftState(leader, state ->
+                assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
 
         // Send PeerDown and PeerUp with no entities
 
@@ -435,46 +464,54 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         kit.expectMsgClass(SuccessReply.class);
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
+        peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
 
         // Add candidates for entity2 with peerMember2 as the owner
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
 
         // Add candidates for entity3 with peerMember2 as the owner.
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
 
         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
 
         // Add only candidate peerMember2 for entity4.
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
 
         // Add only candidate peerMember1 for entity5.
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
+        peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
 
         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
         // owner selected
@@ -490,85 +527,101 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
 
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         // no other candidates for entity4 so peerMember2 should remain owner.
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
 
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
 
         // Reinstate peerMember2
 
-        peer2 = actorFactory.createTestActor(newShardProps(peerId2,
-                    ImmutableMap.<String, String>builder().put(leaderId.toString(), leader.path().toString()).
-                            put(peerId1.toString(), peer1.path().toString()).build(), peerMemberName2,
-                    EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
+        peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
+                    peerId2.toString());
+        peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
         // Send PeerUp again - should be noop
         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
 
         // peerMember2's candidates should be removed on startup.
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
 
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
 
         // Add back candidate peerMember2 for entities 1, 2, & 3.
 
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
-        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
-        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
+        verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
 
         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
 
+        kit.watch(peer1);
         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+        kit.unwatch(peer1);
         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
 
         // Verify the reinstated peerMember2 is fully synced.
 
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
 
         // Reinstate peerMember1 and verify no owner changes
 
-        peer1 = actorFactory.createTestActor(newShardProps(peerId1,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), leader.path().toString()).
-                        put(peerId2.toString(), peer2.path().toString()).build(), peerMemberName1,
-                EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
+        peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
+                peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
+        peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
 
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
 
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
+
+        verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
+        verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
+        verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
 
         // Verify the reinstated peerMember1 is fully synced.
 
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
 
@@ -579,24 +632,226 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
         peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
 
+        kit.watch(leader);
         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+        kit.unwatch(leader);
         peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
         peer2.tell(TimeoutNow.INSTANCE, peer2);
 
-        ShardTestKit.waitUntilLeader(peer2);
+        verifyRaftState(peer2, state ->
+                assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
 
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
+
+        testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
+    }
+
+    @Test
+    public void testLeaderIsolation() throws Exception {
+        testLog.info("testLeaderIsolation starting");
+
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+        ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
+            shardIsolatedLeaderCheckIntervalInMillis(100000);
+
+        TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
+                    peerId1.toString());
+        peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
+                    peerId2.toString());
+        peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+                shardIsolatedLeaderCheckIntervalInMillis(500);
+
+        TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
+                    leaderId.toString());
+
+        ShardTestKit.waitUntilLeader(leader);
+
+        // Add entity1 candidates for all members with the leader as the owner
+
+        DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+        leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
+
+        peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
+
+        verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        // Add entity2 candidates for all members with peer1 as the owner
+
+        DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
+        peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+
+        peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
+
+        leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+        verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+        verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+
+        // Add entity3 candidates for all members with peer2 as the owner
+
+        DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
+        peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
+
+        leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
+
+        peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
+
+        verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
+        verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
+        verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
+
+        // Add listeners on all members
+
+        DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
+        leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true),
+                ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
+        reset(leaderListener);
+
+        DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
+        peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
+                ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true)));
+        reset(peer1Listener);
+
+        DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
+        peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
+                ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true)));
+        reset(peer2Listener);
+
+        // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+        leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
+        leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+        peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
+                ae -> ae.getLeaderId().equals(leaderId.toString()));
+        peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+        // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
+
+        peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
+
+        // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
+        // isolated peers.
+
+        leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
+        leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+
+        verifyRaftState(leader, state ->
+                assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
+
+        // Expect inJeopardy notification on the isolated leader.
+
+        verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true),
+                ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true)));
+        reset(leaderListener);
+
+        verifyRaftState(peer1, state ->
+                assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
+
+        // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
+        // isolated leader.
+
+        peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+        verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
+
+        verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
+        reset(peer1Listener);
+
+        verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+        reset(peer2Listener);
+
+        // Remove the isolation.
+
+        leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
+        leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+        peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+        peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+
+        // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
+
+        verifyRaftState(leader, state ->
+                assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
+
+        verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true),
+                ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
+
+        verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
+        verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+        verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
+
+        verifyNoMoreInteractions(leaderListener);
+        verifyNoMoreInteractions(peer1Listener);
+        verifyNoMoreInteractions(peer2Listener);
+
+        testLog.info("testLeaderIsolation ending");
     }
 
     @Test
     public void testListenerRegistration() throws Exception {
+        testLog.info("testListenerRegistration starting");
+
         ShardTestKit kit = new ShardTestKit(getSystem());
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
-        ShardTestKit.waitUntilLeader(shard);
-        ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
+
+        TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
+        peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
+
+        ShardTestKit.waitUntilLeader(leader);
 
         String otherEntityType = "otherEntityType";
         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
@@ -607,17 +862,17 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Register listener
 
-        shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         // Register a couple candidates for the desired entity type and verify listener is notified.
 
-        shard.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
 
-        shard.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
@@ -625,7 +880,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Register another candidate for another entity type and verify listener is not notified.
 
-        shard.tell(new RegisterCandidateLocal(entity4), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
@@ -633,14 +888,13 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Register remote candidate for entity1
 
-        String remoteMemberName = "remoteMember";
-        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getIdentifier(), remoteMemberName),
-                shardDataTree);
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getIdentifier(), remoteMemberName);
+        peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
 
         // Unregister the local candidate for entity1 and verify listener is notified
 
-        shard.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
+        leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
@@ -648,13 +902,13 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
 
-        shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        shard.tell(new RegisterCandidateLocal(entity3), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
-        verifyOwner(shard, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
         verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
 
@@ -662,7 +916,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         reset(listener);
 
-        shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
@@ -670,278 +924,178 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
         verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
-    }
-
-    private static void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
-            JavaTestKit sender) {
-        BatchedModifications modifications = newBatchedModifications();
-        modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
-
-        shard.tell(modifications, sender.getRef());
-        sender.expectMsgClass(CommitTransactionReply.class);
-    }
 
-    private static BatchedModifications newBatchedModifications() {
-        BatchedModifications modifications = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
-        modifications.setDoCommitOnReady(true);
-        modifications.setReady(true);
-        modifications.setTotalMessagesSent(1);
-        return modifications;
+        testLog.info("testListenerRegistration ending");
     }
 
-    private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
-            YangInstanceIdentifier entityId, String candidateName) {
-        verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
-                path -> {
-                    try {
-                        return AbstractShardTest.readStore(shard, path);
-                    } catch(Exception e) {
-                        throw new AssertionError("Failed to read " + path, e);
-                    }
-            });
-    }
+    @Test
+    public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
+        testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
 
-    private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
-            YangInstanceIdentifier entityId, String candidateName) {
-        verifyEntityCandidate(entityType, entityId, candidateName, path -> {
-            try {
-                return AbstractShardTest.readStore(shard, path);
-            } catch(Exception e) {
-                throw new AssertionError("Failed to read " + path, e);
-            }
-        });
-    }
+        ShardTestKit kit = new ShardTestKit(getSystem());
+        EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
+                addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
 
-    private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
-            YangInstanceIdentifier entityId, String candidateName) {
-        verifyEntityCandidate(entityType, entityId, candidateName, path -> {
-            try {
-                return AbstractShardTest.readStore(shard, path);
-            } catch(Exception e) {
-                throw new AssertionError("Failed to read " + path, e);
-            }
-        }, false);
-    }
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
 
-    private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
-            YangInstanceIdentifier entityId, String candidateName) throws Exception {
-        assertEquals("BatchedModifications size", 1, mods.size());
-        verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
-    }
+        TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
+        peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
 
-    private void verifyBatchedEntityCandidate(Modification mod, String entityType,
-            YangInstanceIdentifier entityId, String candidateName) throws Exception {
-        assertEquals("Modification type", MergeModification.class, mod.getClass());
-        verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
-                entityId, candidateName, true);
-    }
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString());
 
-    private static void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType,
-            YangInstanceIdentifier entityId, String localMemberName) {
-        verifyOwner(localMemberName, entityType, entityId, path -> {
-            try {
-                return AbstractShardTest.readStore(shard, path);
-            } catch(Exception e) {
-                return null;
-            }
-        });
-    }
+        ShardTestKit.waitUntilLeader(leader);
 
-    private Props newShardProps() {
-        return newShardProps(Collections.<String,String>emptyMap());
-    }
+        DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
 
-    private Props newShardProps(EntityOwnerSelectionStrategyConfig strategyConfig, Map<String, String> peers) {
-        return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, strategyConfig);
-    }
+        // Add a remote candidate
 
-    private Props newShardProps(Map<String, String> peers) {
-        return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build());
-    }
+        peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
 
-    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
-                                EntityOwnerSelectionStrategyConfig config) {
-        return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).
-                datastoreContext(dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).
-                localMemberName(MemberName.forName(memberName)).ownerSelectionStrategyConfig(config).props()
-                .withDispatcher(Dispatchers.DefaultDispatcherId());
-    }
+        // Register local
 
-    private static ShardIdentifier newShardId(String memberName) {
-        return ShardIdentifier.create("entity-ownership", MemberName.forName(memberName),
-            "operational" + NEXT_SHARD_NUM.getAndIncrement());
-    }
+        leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
 
-    private static class TestEntityOwnershipShard extends EntityOwnershipShard {
+        // Verify the local candidate becomes owner
 
-        TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
-                DatastoreContext datastoreContext) {
-            super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext).
-                    schemaContext(SCHEMA_CONTEXT).localMemberName(name.getMemberName()));
-        }
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
 
-        @Override
-        public void handleCommand(Object message) {
-            if(!(message instanceof ElectionTimeout)) {
-                super.handleCommand(message);
-            }
-        }
+        testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
     }
 
-    private static class MockFollower extends UntypedActor {
-        volatile boolean grantVote;
-        volatile boolean dropAppendEntries;
-        private final String myId;
-
-        @SuppressWarnings("unused")
-        public MockFollower(String myId) {
-            this(myId, true);
-        }
+    @Test
+    public void testDelayedEntityOwnerSelection() throws Exception {
+        testLog.info("testDelayedEntityOwnerSelection starting");
 
-        public MockFollower(String myId, boolean grantVote) {
-            this.myId = myId;
-            this.grantVote = grantVote;
-        }
+        ShardTestKit kit = new ShardTestKit(getSystem());
+        EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
+                addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
 
-        @Override
-        public void onReceive(Object message) {
-            if(message instanceof RequestVote) {
-                if(grantVote) {
-                    getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
-                }
-            } else if(message instanceof AppendEntries) {
-                if(!dropAppendEntries) {
-                    AppendEntries req = (AppendEntries) message;
-                    long lastIndex = req.getLeaderCommit();
-                    if (req.getEntries().size() > 0) {
-                        for(ReplicatedLogEntry entry : req.getEntries()) {
-                            lastIndex = entry.getIndex();
-                        }
-                    }
-
-                    getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
-                            DataStoreVersions.CURRENT_VERSION), getSelf());
-                }
-            }
-        }
-    }
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
 
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+        ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
 
-    @Test
-    public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
-        ShardTestKit kit = new ShardTestKit(getSystem());
-        EntityOwnerSelectionStrategyConfig.Builder builder
-                = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
+        TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
+                    peerId1.toString());
+        peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
 
-        String peerId = newShardId("follower").toString();
-        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+        TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+                newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
+                    peerId2.toString());
+        peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
 
-        peer.underlyingActor().grantVote = true;
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
+                newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()),
+                    leaderId.toString());
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
-                ImmutableMap.of(peerId.toString(), peer.path().toString())));
-        ShardTestKit.waitUntilLeader(shard);
+        ShardTestKit.waitUntilLeader(leader);
 
         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
-        ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
 
         // Add a remote candidate
 
-        String remoteMemberName1 = "follower";
-        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+        peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
 
         // Register local
 
-        shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
+        leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
         kit.expectMsgClass(SuccessReply.class);
 
         // Verify the local candidate becomes owner
 
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-    }
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
+        verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
 
-    @Test
-    public void testDelayedEntityOwnerSelection() throws Exception {
-        ShardTestKit kit = new ShardTestKit(getSystem());
-        EntityOwnerSelectionStrategyConfig.Builder builder
-                = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
+        testLog.info("testDelayedEntityOwnerSelection ending");
+    }
 
-        String follower1Id = newShardId("follower1").toString();
-        TestActorRef<MockFollower> follower1 = actorFactory.createTestActor(Props.create(MockFollower.class, follower1Id, false).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), follower1Id);
+    private Props newLocalShardProps() {
+        return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
+    }
 
-        follower1.underlyingActor().grantVote = true;
+    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
+        return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
+    }
 
-        String follower2Id = newShardId("follower").toString();
-        TestActorRef<MockFollower> follower2 = actorFactory.createTestActor(Props.create(MockFollower.class, follower2Id, false).
-                withDispatcher(Dispatchers.DefaultDispatcherId()), follower2Id);
+    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
+                                EntityOwnerSelectionStrategyConfig config) {
+        return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
+                    .withDispatcher(Dispatchers.DefaultDispatcherId());
+    }
 
-        follower2.underlyingActor().grantVote = true;
+    private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
+            String memberName) {
+        return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
+                dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
+                        MemberName.forName(memberName)).ownerSelectionStrategyConfig(
+                                EntityOwnerSelectionStrategyConfig.newBuilder().build());
+    }
 
+    private Map<String, String> peerMap(String... peerIds) {
+        ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
+        for(String peerId: peerIds) {
+            builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
+        }
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
-                ImmutableMap.of(follower1Id.toString(), follower2.path().toString(), follower2Id.toString(), follower2.path().toString())));
-        ShardTestKit.waitUntilLeader(shard);
+        return builder.build();
+    }
 
-        DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
-        ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+    private static class TestEntityOwnershipShard extends EntityOwnershipShard {
+        private final TestActorRef<MessageCollectorActor> collectorActor;
+        private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
 
-        // Add a remote candidate
+        TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+            super(builder);
+            this.collectorActor = collectorActor;
+        }
 
-        String remoteMemberName1 = "follower";
-        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        @Override
+        public void handleCommand(Object message) {
+            if(collectorActor != null) {
+                collectorActor.tell(message, ActorRef.noSender());
+            }
 
-        // Register local
+            Predicate drop = dropMessagesOfType.get(message.getClass());
+            if(drop == null || !drop.test(message)) {
+                super.handleCommand(message);
+            }
+        }
 
-        shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
-        kit.expectMsgClass(SuccessReply.class);
+        void startDroppingMessagesOfType(Class<?> msgClass) {
+            dropMessagesOfType.put(msgClass, msg -> true);
+        }
 
-        // Verify the local candidate becomes owner
+        <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
+            dropMessagesOfType.put(msgClass, filter);
+        }
 
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
-        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-        verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
-    }
+        void stopDroppingMessagesOfType(Class<?> msgClass) {
+            dropMessagesOfType.remove(msgClass);
+        }
 
-    private static class MockLeader extends UntypedActor {
-        volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
-        List<Modification> receivedModifications = new ArrayList<>();
-        volatile boolean sendReply = true;
-        volatile long delay;
+        TestActorRef<MessageCollectorActor> collectorActor() {
+            return collectorActor;
+        }
 
-        @Override
-        public void onReceive(Object message) {
-            if(message instanceof BatchedModifications) {
-                if(delay > 0) {
-                    Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
-                }
-
-                if(sendReply) {
-                    BatchedModifications mods = (BatchedModifications) message;
-                    synchronized (receivedModifications) {
-                        for(int i = 0; i < mods.getModifications().size(); i++) {
-                            receivedModifications.add(mods.getModifications().get(i));
-                            modificationsReceived.countDown();
-                        }
-                    }
-
-                    getSender().tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).
-                            toSerializable(), getSelf());
-                } else {
-                    sendReply = true;
-                }
-            }
+        static Props props(Builder builder) {
+            return props(builder, null);
         }
 
-        List<Modification> getAndClearReceivedModifications() {
-            synchronized (receivedModifications) {
-                List<Modification> ret = new ArrayList<>(receivedModifications);
-                receivedModifications.clear();
-                return ret;
-            }
+        static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
+            return Props.create(TestEntityOwnershipShard.class, builder, collectorActor).
+                    withDispatcher(Dispatchers.DefaultDispatcherId());
         }
     }
 }