Bug 6540: EOS - Rework behavior of onPeerDown 29/45129/5
authorTom Pantelis <tpanteli@brocade.com>
Sun, 4 Sep 2016 01:08:41 +0000 (21:08 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 7 Sep 2016 13:28:39 +0000 (13:28 +0000)
https://git.opendaylight.org/gerrit/#/c/26808/ modified the behavior of
onPeerDown to remove all the down node's candidates. However this behavior is
problematic in the case when the shard leader is isolated. The majority partition
will elect a new leader which temporarily results in split-brain and 2 leaders
which independently attempt to remove the other side's candidates. When the partition
is healed, all hell breaks loose trying to reconcile their differences. This is
compounded with the singleton service because it uses 2 entities that are related
to one another.

To alleviate this, I reverted back to the behavior of selecting a new owner for
the entities owned by the down node and leaving the down node as a candidate.
In the case where the down node is the only candidate, it leaves it as the owner.
This doesn't hurt anything and avoids complications with having to re-instate the
down node as owner when it re-joins if it was actually isolated. The idea here is
to keep its candidacy to minimize disruption until proven otherwise since we don't
know if the downed node's process is actually still alive. If another node registers
a candidate it will replace the down node as the owner.

To handle the case where the down node actually restarted, after startup when it
first hears from the leader, it sends a RemoveAllCandidates message to the leader to
remove it from all entities. This cleans out stale candidates should no local client
register a candidate in the new incarnation.

The unit tests revealed an orthogonal issue with the PreLeader state. The PreLeader
switches to Leader when the commit index is up to date but before applying the entries
to the state. However the EOS may commit modifications immediately before the
ApplyState message for prior entries is received. This can result in the "Store tree X  and candidate base Y differ" exception. So I modified the PreLeader behavior to
switch to Leader when the last applied index is up to date. This makes sense b/c
the PreLeader bevavior is intended to protect the state from inconsistencies.

I also fixed a couple bugs where the downPeerMemberNames was accessed with a String
rather than a MemberName instance. This was a remnant of changing downPeerMemberNames
to store MemberName.

Change-Id: I326660c172353539146a2216cc8a70a4b842affe
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RemoveAllCandidates.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java

index dce9ee8d596dccf6bc46b0f8b98bf4e5365570db..660a6463f0ecd0ae5ff3f6e2d7af8ff8fef7d323 100644 (file)
@@ -257,6 +257,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
+            // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState.
+            possiblyHandleBehaviorMessage(message);
+
         } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             if(LOG.isDebugEnabled()) {
@@ -284,23 +287,27 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ((Runnable)message).run();
         } else if(message instanceof NoopPayload) {
             persistData(null, null, (NoopPayload)message);
-        } else {
-            // Processing the message may affect the state, hence we need to capture it
-            final RaftActorBehavior currentBehavior = getCurrentBehavior();
-            final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
-
-            // A behavior indicates that it processed the change by returning a reference to the next behavior
-            // to be used. A null return indicates it has not processed the message and we should be passing it to
-            // the subclass for handling.
-            final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
-            if (nextBehavior != null) {
-                switchBehavior(state, nextBehavior);
-            } else {
-                handleNonRaftCommand(message);
-            }
+        } else if (!possiblyHandleBehaviorMessage(message)) {
+            handleNonRaftCommand(message);
         }
     }
 
+    private boolean possiblyHandleBehaviorMessage(final Object message) {
+        final RaftActorBehavior currentBehavior = getCurrentBehavior();
+        final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+        // A behavior indicates that it processed the change by returning a reference to the next behavior
+        // to be used. A null return indicates it has not processed the message and we should be passing it to
+        // the subclass for handling.
+        final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
+        if (nextBehavior != null) {
+            switchBehavior(state, nextBehavior);
+            return true;
+        }
+
+        return false;
+    }
+
     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
@@ -594,7 +601,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @return A reference to the leader if known, null otherwise
      */
-    protected ActorSelection getLeader(){
+    public ActorSelection getLeader(){
         String leaderAddress = getLeaderAddress();
 
         if(leaderAddress == null){
index 52ed26758ee26b6f9949fa1202f1a44628b82364..176704f3d377323a962d5a171cfe53c84beb494e 100644 (file)
@@ -115,8 +115,8 @@ public class Candidate extends AbstractRaftActorBehavior {
         }
 
         if (voteCount >= votesRequired) {
-            if(context.getCommitIndex() < context.getReplicatedLog().lastIndex()) {
-                LOG.debug("{}: Connmit index {} is behind last index {}", logName(), context.getCommitIndex(),
+            if(context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
+                LOG.debug("{}: LastApplied index {} is behind last index {}", logName(), context.getLastApplied(),
                         context.getReplicatedLog().lastIndex());
                 return internalSwitchBehavior(RaftState.PreLeader);
             } else {
index e3ae4f427ac362c4859f22a40cda707b6145944a..eab3f7d7da1d49feb57f71b801b4cbe6b4362d2a 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 
 /**
@@ -37,14 +37,16 @@ public class PreLeader extends AbstractLeader {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
-        RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
-
-        if(context.getCommitIndex() >= context.getReplicatedLog().lastIndex()) {
-            // We've committed all entries - we can switch to Leader.
-            returnBehavior = internalSwitchBehavior(new Leader(context, this));
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+        if (message instanceof ApplyState) {
+            if(context.getLastApplied() >= context.getReplicatedLog().lastIndex()) {
+                // We've applied all entries - we can switch to Leader.
+                return internalSwitchBehavior(new Leader(context, this));
+            } else {
+                return this;
+            }
+        } else {
+            return super.handleMessage(sender, message);
         }
-
-        return returnBehavior;
     }
 }
index c6773d83c2c94714a67fe01a84cb1e7c89eb242a..924ba57986739a1a56865b797a03007df8a7f872 100644 (file)
@@ -112,6 +112,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
     @Test
     public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
         MockRaftActorContext raftActorContext = createActorContext();
+        raftActorContext.setLastApplied(raftActorContext.getReplicatedLog().lastIndex());
         raftActorContext.setPeerAddresses(setupPeers(2));
         candidate = new Candidate(raftActorContext);
 
@@ -120,6 +121,18 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         assertEquals("Behavior", RaftState.Leader, candidate.state());
     }
 
+    @Test
+    public void testBecomePreLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
+        MockRaftActorContext raftActorContext = createActorContext();
+        raftActorContext.setPeerAddresses(setupPeers(2));
+        candidate = new Candidate(raftActorContext);
+
+        candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, true));
+
+        // LastApplied is -1 and behind the last index.
+        assertEquals("Behavior", RaftState.PreLeader, candidate.state());
+    }
+
     @Test
     public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster(){
         MockRaftActorContext raftActorContext = createActorContext();
@@ -127,6 +140,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().
                 createEntries(0, 5, 1).build());
         raftActorContext.setCommitIndex(raftActorContext.getReplicatedLog().lastIndex());
+        raftActorContext.setLastApplied(raftActorContext.getReplicatedLog().lastIndex());
         raftActorContext.setPeerAddresses(setupPeers(4));
         candidate = new Candidate(raftActorContext);
 
index 46743482928f92bc1e44f28c0df8c27923d8f721..4a6a37aabe77e43e7b77ba6ba7e7a117caf75328 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
@@ -17,10 +18,8 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -43,6 +42,7 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Ca
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RemoveAllCandidates;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
@@ -59,6 +59,7 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -81,6 +82,7 @@ class EntityOwnershipShard extends Shard {
     private final EntityOwnerSelectionStrategyConfig strategyConfig;
     private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
     private final EntityOwnershipStatistics entityOwnershipStatistics;
+    private boolean removeAllInitialCandidates = true;
 
     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
@@ -134,19 +136,25 @@ class EntityOwnershipShard extends Shard {
             onUnregisterListenerLocal((UnregisterListenerLocal) message);
         } else if(message instanceof SelectOwner) {
             onSelectOwner((SelectOwner) message);
+        } else if(message instanceof RemoveAllCandidates) {
+            onRemoveAllCandidates((RemoveAllCandidates) message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
             super.handleNonRaftCommand(message);
         }
     }
 
+    private void onRemoveAllCandidates(RemoveAllCandidates message) {
+        LOG.debug("{}: onRemoveAllCandidates: {}", persistenceId(), message);
+
+        removeCandidateFromEntities(message.getMemberName());
+    }
+
     private void onSelectOwner(SelectOwner selectOwner) {
         LOG.debug("{}: onSelectOwner: {}", persistenceId(), selectOwner);
 
         String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)) {
-            String entityType = EntityOwnersModel.entityTypeFromEntityPath(selectOwner.getEntityPath());
             writeNewOwner(selectOwner.getEntityPath(), newOwner(currentOwner, selectOwner.getAllCandidates(),
-                    entityOwnershipStatistics.byEntityType(entityType),
                     selectOwner.getOwnerSelectionStrategy()));
 
             Cancellable cancellable = entityToScheduledOwnershipTask.get(selectOwner.getEntityPath());
@@ -235,6 +243,8 @@ class EntityOwnershipShard extends Shard {
         } else {
             final ActorSelection leader = getLeader();
             if (leader != null) {
+                possiblyRemoveAllInitialCandidates(leader);
+
                 if(LOG.isDebugEnabled()) {
                     LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
                             modifications.getTransactionID(), leader);
@@ -248,8 +258,26 @@ class EntityOwnershipShard extends Shard {
         }
     }
 
+    void possiblyRemoveAllInitialCandidates(ActorSelection leader) {
+        // The following handles removing all candidates on startup when re-joining with a remote leader. When a
+        // follower is detected as down, the leader will re-assign new owners to entities that were owned by the
+        // down member but doesn't remove the down member as a candidate, as the down node may actually be isolated
+        // and still running. Therefore on startup we send an initial message to the remote leader to remove any
+        // potential stale candidates we had previously registered, as it's possible a candidate may not be
+        // registered by a client in the new incarnation. We have to send the RemoveAllCandidates message prior to any
+        // pending registrations.
+        if(removeAllInitialCandidates && leader != null) {
+            removeAllInitialCandidates = false;
+            if(!isLeader()) {
+                LOG.debug("{} - got new leader {} on startup - sending RemoveAllCandidates", persistenceId(), leader);
+
+                leader.tell(new RemoveAllCandidates(localMemberName), ActorRef.noSender());
+            }
+        }
+    }
+
     boolean hasLeader() {
-        return getLeader() != null && !isIsolatedLeader();
+        return getLeader() != null && (!isLeader() || isLeaderActive());
     }
 
     /**
@@ -326,11 +354,11 @@ class EntityOwnershipShard extends Shard {
             // EntityOwnershipStatistics
             strategyConfig.clearStrategies();
 
-            // Remove the candidates for all members that are known to be down. In a cluster which has greater than
+            // Re-assign owners for all members that are known to be down. In a cluster which has greater than
             // 3 nodes it is possible for a some node beside the leader being down when the leadership transitions
-            // it makes sense to use this event to remove all the candidates for those downed nodes
+            // it makes sense to use this event to re-assign owners for those downed nodes
             for (MemberName downPeerName : downPeerMemberNames) {
-                removeCandidateFromEntities(downPeerName);
+                selectNewOwnerForEntitiesOwnedBy(downPeerName);
             }
         } else {
             // The leader changed - notify the coordinator to check if pending modifications need to be sent.
@@ -345,26 +373,9 @@ class EntityOwnershipShard extends Shard {
 
         if(isLeader()) {
             String currentOwner = getCurrentOwner(message.getEntityPath());
-            if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().size() == 0){
-                String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
+            if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().isEmpty()){
                 writeNewOwner(message.getEntityPath(),
-                        newOwner(currentOwner, message.getRemainingCandidates(), entityOwnershipStatistics.byEntityType(entityType),
-                                getEntityOwnerElectionStrategy(message.getEntityPath())));
-            }
-        } else {
-            // We're not the leader. If the removed candidate is our local member then check if we actually
-            // have a local candidate registered. If we do then we must have been partitioned from the leader
-            // and the leader removed our candidate since the leader can't tell the difference between a
-            // temporary network partition and a node's process actually restarted. So, in that case, re-add
-            // our candidate.
-            if(localMemberName.getName().equals(message.getRemovedCandidate()) &&
-                    listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
-                LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
-                    " - adding back local candidate", message.getEntityPath());
-
-                commitCoordinator.commitModification(new MergeModification(
-                        candidatePath(message.getEntityPath(), localMemberName.getName()),
-                        candidateMapEntry(localMemberName.getName())), this);
+                        newOwner(currentOwner, message.getRemainingCandidates(), getEntityOwnerElectionStrategy(message.getEntityPath())));
             }
         }
     }
@@ -383,31 +394,28 @@ class EntityOwnershipShard extends Shard {
 
         // Since a node's candidate member is only added by the node itself, we can assume the node is up so
         // remove it from the downPeerMemberNames.
-        downPeerMemberNames.remove(message.getNewCandidate());
+        downPeerMemberNames.remove(MemberName.forName(message.getNewCandidate()));
 
         final String currentOwner = getCurrentOwner(message.getEntityPath());
         final EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
-        final String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
 
         // Available members is all the known peers - the number of peers that are down + self
         // So if there are 2 peers and 1 is down then availableMembers will be 2
-        final int availableMembers = (peerIdToMemberNames.size() - downPeerMemberNames.size()) + 1;
+        final int availableMembers = peerIdToMemberNames.size() - downPeerMemberNames.size() + 1;
 
         LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
 
-        if(!message.getAllCandidates().contains(currentOwner)){
-            if(strategy.getSelectionDelayInMillis() == 0L) {
-                writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
-                        entityOwnershipStatistics.byEntityType(entityType), strategy));
-            } else if(message.getAllCandidates().size() == availableMembers) {
-                LOG.debug("{}: Received the maximum candidates requests : {} writing new owner",
-                        persistenceId(), availableMembers);
-                cancelOwnerSelectionTask(message.getEntityPath());
-                writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
-                        entityOwnershipStatistics.byEntityType(entityType), strategy));
-            } else {
-                scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
-            }
+        if(strategy.getSelectionDelayInMillis() == 0L) {
+            writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
+                    strategy));
+        } else if(message.getAllCandidates().size() == availableMembers) {
+            LOG.debug("{}: Received the maximum candidates requests : {} writing new owner",
+                    persistenceId(), availableMembers);
+            cancelOwnerSelectionTask(message.getEntityPath());
+            writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
+                    strategy));
+        } else {
+            scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
         }
     }
 
@@ -416,11 +424,41 @@ class EntityOwnershipShard extends Shard {
 
         MemberName downMemberName = peerDown.getMemberName();
         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
-            // Remove the down peer as a candidate from all entities.
-            removeCandidateFromEntities(downMemberName);
+            // Select new owners for entities owned by the down peer and which have other candidates. For an entity for
+            // which the down peer is the only candidate, we leave it as the owner and don't clear it. This is done to
+            // handle the case where the peer member process is actually still running but the node is partitioned.
+            // When the partition is healed, the peer just remains as the owner. If the peer process actually restarted,
+            // it will first remove all its candidates on startup. If another candidate is registered during the time
+            // the peer is down, the new candidate will be selected as the new owner.
+
+            selectNewOwnerForEntitiesOwnedBy(downMemberName);
         }
     }
 
+    private void selectNewOwnerForEntitiesOwnedBy(MemberName downMemberName) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntitiesOwnedBy(downMemberName.getName(), (entityTypeNode, entityNode) -> {
+            YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+                    node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+                    node(ENTITY_OWNER_NODE_ID).build();
+            String newOwner = newOwner(getCurrentOwner(entityPath), getCandidateNames(entityNode),
+                    getEntityOwnerElectionStrategy(entityPath));
+
+            if(!newOwner.isEmpty()) {
+                LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+                modifications.addModification(new WriteModification(entityPath,
+                        ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+
+            } else {
+                LOG.debug("{}: Found entity {} but no other candidates - not clearing owner", persistenceId(),
+                        entityPath, newOwner);
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
     private void onPeerUp(PeerUp peerUp) {
         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
 
@@ -428,23 +466,45 @@ class EntityOwnershipShard extends Shard {
         downPeerMemberNames.remove(peerUp.getMemberName());
 
         // Notify the coordinator to check if pending modifications need to be sent. We do this here
-        // to handle the case where the leader's peer address isn't now yet when a prior state or
+        // to handle the case where the leader's peer address isn't known yet when a prior state or
         // leader change occurred.
         commitCoordinator.onStateChanged(this, isLeader());
     }
 
-    private void removeCandidateFromEntities(final MemberName owner) {
+    private Collection<String> getCandidateNames(MapEntryNode entity) {
+        Collection<MapEntryNode> candidates = ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
+        Collection<String> candidateNames = new ArrayList<>(candidates.size());
+        for(MapEntryNode candidate: candidates) {
+            candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
+        }
+
+        return candidateNames;
+    }
+
+    private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
+        LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+
+        searchForEntities((entityTypeNode, entityNode) -> {
+            Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+                    entityNode.getChild(ENTITY_OWNER_NODE_ID);
+            if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+                walker.onEntity(entityTypeNode, entityNode);
+            }
+        });
+    }
+
+    private void removeCandidateFromEntities(final MemberName member) {
         final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
         searchForEntities((entityTypeNode, entityNode) -> {
-            if (hasCandidate(entityNode, owner)) {
+            if (hasCandidate(entityNode, member)) {
                 YangInstanceIdentifier entityId =
                         (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
                 YangInstanceIdentifier candidatePath = candidatePath(
                         entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
-                        entityId, owner.getName());
+                        entityId, member.getName());
 
                 LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
-                        owner, candidatePath);
+                        member, candidatePath);
 
                 modifications.addModification(new DeleteModification(candidatePath));
             }
@@ -511,9 +571,9 @@ class EntityOwnershipShard extends Shard {
         }
     }
 
-    private String newOwner(String currentOwner, Collection<String> candidates, Map<String, Long> statistics, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
+    private String newOwner(String currentOwner, Collection<String> candidates, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
         Collection<String> viableCandidates = getViableCandidates(candidates);
-        if(viableCandidates.size() == 0){
+        if(viableCandidates.isEmpty()){
             return "";
         }
         return ownerSelectionStrategy.newOwner(currentOwner, viableCandidates);
@@ -523,7 +583,7 @@ class EntityOwnershipShard extends Shard {
         Collection<String> viableCandidates = new ArrayList<>();
 
         for (String candidate : candidates) {
-            if (!downPeerMemberNames.contains(candidate)) {
+            if (!downPeerMemberNames.contains(MemberName.forName(candidate))) {
                 viableCandidates.add(candidate);
             }
         }
@@ -538,6 +598,7 @@ class EntityOwnershipShard extends Shard {
         return null;
     }
 
+    @FunctionalInterface
     private static interface EntityWalker {
         void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
     }
index 97e6c62037b2801e1ea1e6521cc8b77597be3335..56ecd52276d9638cf30d68c220c67ebd58f42095 100644 (file)
@@ -168,7 +168,7 @@ class EntityOwnershipShardCommitCoordinator {
         if(inflightCommit != null || !hasLeader) {
             if(log.isDebugEnabled()) {
                 log.debug("{} - adding modifications to pending",
-                        (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
+                        inflightCommit != null ? "A commit is inflight" : "No shard leader");
             }
 
             pendingModifications.addAll(modifications.getModifications());
@@ -179,6 +179,8 @@ class EntityOwnershipShardCommitCoordinator {
     }
 
     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
+        shard.possiblyRemoveAllInitialCandidates(shard.getLeader());
+
         if(!isLeader && inflightCommit != null) {
             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
             // consensus for the commit and switched to follower due to another node with a higher term. We
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RemoveAllCandidates.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RemoveAllCandidates.java
new file mode 100644 (file)
index 0000000..4ecae72
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import java.io.Serializable;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+
+/**
+ * Message sent by an EntityOwnershipShard to its leader on startup to remove all its candidates.                                                                               .
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoveAllCandidates implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final MemberName memberName;
+
+    public RemoveAllCandidates(MemberName memberName) {
+        this.memberName = memberName;
+    }
+
+    public MemberName getMemberName() {
+        return memberName;
+    }
+
+    @Override
+    public String toString() {
+        return "RemoveAllCandidates [memberName=" + memberName + "]";
+    }
+}
index 363450704d0c2809953c8b311bfcb883f4b970b9..e4ca64ffe812b2422308287945e4106a749a65a5 100644 (file)
@@ -897,7 +897,7 @@ public class ShardTest extends AbstractShardTest {
                         }
 
                         @Override
-                        protected ActorSelection getLeader() {
+                        public ActorSelection getLeader() {
                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
                                 super.getLeader();
                         }
@@ -1427,19 +1427,14 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testAbortWithCommitPending() throws Throwable {
         new ShardTestKit(getSystem()) {{
-            final Creator<Shard> creator = new Creator<Shard>() {
+            final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                 @Override
-                public Shard create() throws Exception {
-                    return new Shard(newShardBuilder()) {
-                        @Override
-                        void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
-                            // Simulate an AbortTransaction message occurring during replication, after
-                            // persisting and before finishing the commit to the in-memory store.
+                void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+                    // Simulate an AbortTransaction message occurring during replication, after
+                    // persisting and before finishing the commit to the in-memory store.
 
-                            doAbortTransaction(transactionId, null);
-                            super.persistPayload(transactionId, payload);
-                        }
-                    };
+                    doAbortTransaction(transactionId, null);
+                    super.persistPayload(transactionId, payload);
                 }
             };
 
index 893ac52b05a36b418ba7ab1ee8e7688b20056278..3a481feb811722dda8e1f6ef308a517f008bf17a 100644 (file)
@@ -11,7 +11,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.AdditionalMatchers.or;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -239,22 +238,19 @@ public class DistributedEntityOwnershipIntegrationTest {
         verifyCandidates(leaderDistributedDataStore, ENTITY4, "member-3", "member-2");
         verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
 
-        // Shutdown follower2 and verify it's owned entities (entity 2 & 4) get re-assigned
+        // Shutdown follower2 and verify it's owned entities (entity 4) get re-assigned
 
         reset(leaderMockListener, follower1MockListener);
         follower2Node.cleanup();
 
-        verify(follower1MockListener, timeout(15000).times(2)).ownershipChanged(or(ownershipChange(ENTITY4, false, true, true),
-                ownershipChange(ENTITY2, false, false, false)));
-        verify(leaderMockListener, timeout(15000).times(2)).ownershipChanged(or(ownershipChange(ENTITY4, false, false, true),
-                ownershipChange(ENTITY2, false, false, false)));
-        verifyOwner(leaderDistributedDataStore, ENTITY2, ""); // no other candidate
+        verify(follower1MockListener, timeout(15000)).ownershipChanged(ownershipChange(ENTITY4, false, true, true));
+        verify(leaderMockListener, timeout(15000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
 
         // Register leader candidate for entity2 and verify it becomes owner
 
         DOMEntityOwnershipCandidateRegistration leaderEntity2Reg = leaderEntityOwnershipService.registerCandidate(ENTITY2);
-        verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
+        verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
 
         // Unregister leader candidate for entity2 and verify the owner is cleared
 
@@ -265,7 +261,7 @@ public class DistributedEntityOwnershipIntegrationTest {
     }
 
     @Test
-    public void testLeaderCandidatesRemovedAfterShutdown() throws Exception {
+    public void testLeaderEntityOwnersReassignedAfterShutdown() throws Exception {
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5).
                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -331,16 +327,16 @@ public class DistributedEntityOwnershipIntegrationTest {
         MemberNode.verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME,
             raftState -> assertEquals("Raft state", RaftState.Leader.toString(), raftState.getRaftState()));
 
-        // Verify the prior leader's candidates are removed
+        // Verify the prior leader's entity owners are re-assigned.
 
-        verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2");
-        verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-3");
+        verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2", "member-1");
+        verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-1", "member-3");
         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
     }
 
     @Test
-    public void testLeaderAndFollowerCandidatesRemovedAfterShutdown() throws Exception {
+    public void testLeaderAndFollowerEntityOwnersReassignedAfterShutdown() throws Exception {
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5).
                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -375,7 +371,7 @@ public class DistributedEntityOwnershipIntegrationTest {
         DOMEntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
         DOMEntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
         DOMEntityOwnershipService follower3EntityOwnershipService = newOwnershipService(follower3Node.configDataStore());
-        DOMEntityOwnershipService follower4EntityOwnershipService = newOwnershipService(follower4Node.configDataStore());
+        newOwnershipService(follower4Node.configDataStore());
 
         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
 
@@ -407,8 +403,7 @@ public class DistributedEntityOwnershipIntegrationTest {
         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3", "member-4");
         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
 
-
-        // Shutdown the leader and verify its removed from the candidate list
+        // Shutdown the leader and follower3
 
         leaderNode.cleanup();
         follower3Node.cleanup();
@@ -426,10 +421,10 @@ public class DistributedEntityOwnershipIntegrationTest {
         MemberNode.verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME,
                 raftState -> assertEquals("Raft state", RaftState.Leader.toString(), raftState.getRaftState()));
 
-        // Verify the prior leader's and follower3 candidates are removed
+        // Verify the prior leader's and follower3 entity owners are re-assigned.
 
-        verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2");
-        verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-3");
+        verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2", "member-1");
+        verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-1", "member-3", "member-4");
         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
     }
index af347903f8e9e1fabb4cfba980f682444c368987..13fb8dc0322ff04c9a19382e226e2d70b0a4a93c 100644 (file)
@@ -27,7 +27,6 @@ import akka.actor.UntypedActor;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -408,12 +407,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         ShardIdentifier peerId2 = newShardId(peerMemberName2);
 
         TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
-                        peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), actorFactory.createTestActorPath(leaderId.toString())).
+                        put(peerId2.toString(), actorFactory.createTestActorPath(peerId2.toString())).build(), peerMemberName1,
+                EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
 
         TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
-                        peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), actorFactory.createTestActorPath(leaderId.toString())).
+                        put(peerId1.toString(), peer1.path().toString()).build(), peerMemberName2,
+                EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
 
         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
                 ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
@@ -488,29 +489,37 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        // no other candidates for entity4 so peerMember2 should remain owner.
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
 
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
-        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
 
-        // Reinstate peerMember2 - no owners should change
+        // Reinstate peerMember2
 
         peer2 = actorFactory.createTestActor(newShardProps(peerId2,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
-                        peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
+                    ImmutableMap.<String, String>builder().put(leaderId.toString(), leader.path().toString()).
+                            put(peerId1.toString(), peer1.path().toString()).build(), peerMemberName2,
+                    EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
         // Send PeerUp again - should be noop
         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        // peerMember2's candidates should be removed on startup.
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
 
         // Add back candidate peerMember2 for entities 1, 2, & 3.
@@ -534,29 +543,34 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         // Verify the reinstated peerMember2 is fully synced.
 
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
 
         // Reinstate peerMember1 and verify no owner changes
 
         peer1 = actorFactory.createTestActor(newShardProps(peerId1,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
-                        peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), leader.path().toString()).
+                        put(peerId2.toString(), peer2.path().toString()).build(), peerMemberName1,
+                EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
 
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
-        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
+
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
 
         // Verify the reinstated peerMember1 is fully synced.
 
-        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
-        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
-        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
 
         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
         // the entities (1 and 3) previously owned by the local leader member.
@@ -571,69 +585,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         ShardTestKit.waitUntilLeader(peer2);
 
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
-        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
-    }
-
-    @Test
-    public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
-        ShardTestKit kit = new ShardTestKit(getSystem());
-
-        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
-        ShardIdentifier leaderId = newShardId("leader");
-        ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
-
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
-                TestEntityOwnershipShard.class, localId,
-                ImmutableMap.<String, String>builder().put(leaderId.toString(), "".toString()).build(),
-                dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
-
-        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
-                ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
-                    leaderId.getMemberName().getName(), EntityOwnerSelectionStrategyConfig.newBuilder().build())
-                .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
-        leader.tell(TimeoutNow.INSTANCE, leader);
-
-        ShardTestKit.waitUntilLeader(leader);
-
-        shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
-
-        DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
-        DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
-
-        shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
-        kit.expectMsgClass(SuccessReply.class);
-
-        // Register local candidate
-
-        shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
-        kit.expectMsgClass(SuccessReply.class);
-        verifyCommittedEntityCandidate(shard, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
-        verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
-        reset(listener);
-
-        // Simulate a replicated commit from the leader to remove the local candidate that would occur after a
-        // network partition is healed.
-
-        leader.tell(new PeerDown(localId.getMemberName(), localId.toString()), ActorRef.noSender());
-
-        verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false));
-
-        // Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
-
-        verifyCommittedEntityCandidate(shard, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
-        verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
-
-        // Unregister the local candidate and verify it's removed and no re-added.
-
-        shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
-        kit.expectMsgClass(SuccessReply.class);
-
-        verifyNoEntityCandidate(shard, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
-        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-        verifyNoEntityCandidate(shard, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
     }
 
     @Test
@@ -737,42 +692,33 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String candidateName) {
         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
-                new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
-                    @Override
-                    public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
-                        try {
-                            return AbstractShardTest.readStore(shard, path);
-                        } catch(Exception e) {
-                            throw new AssertionError("Failed to read " + path, e);
-                        }
-                }
-        });
+                path -> {
+                    try {
+                        return AbstractShardTest.readStore(shard, path);
+                    } catch(Exception e) {
+                        throw new AssertionError("Failed to read " + path, e);
+                    }
+            });
     }
 
     private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String candidateName) {
-        verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
-            @Override
-            public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
-                try {
-                    return AbstractShardTest.readStore(shard, path);
-                } catch(Exception e) {
-                    throw new AssertionError("Failed to read " + path, e);
-                }
+        verifyEntityCandidate(entityType, entityId, candidateName, path -> {
+            try {
+                return AbstractShardTest.readStore(shard, path);
+            } catch(Exception e) {
+                throw new AssertionError("Failed to read " + path, e);
             }
         });
     }
 
     private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String candidateName) {
-        verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
-            @Override
-            public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
-                try {
-                    return AbstractShardTest.readStore(shard, path);
-                } catch(Exception e) {
-                    throw new AssertionError("Failed to read " + path, e);
-                }
+        verifyEntityCandidate(entityType, entityId, candidateName, path -> {
+            try {
+                return AbstractShardTest.readStore(shard, path);
+            } catch(Exception e) {
+                throw new AssertionError("Failed to read " + path, e);
             }
         }, false);
     }
@@ -792,14 +738,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
     private static void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String localMemberName) {
-        verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
-            @Override
-            public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
-                try {
-                    return AbstractShardTest.readStore(shard, path);
-                } catch(Exception e) {
-                    return null;
-                }
+        verifyOwner(localMemberName, entityType, entityId, path -> {
+            try {
+                return AbstractShardTest.readStore(shard, path);
+            } catch(Exception e) {
+                return null;
             }
         });
     }