From 07c96b0fa318b7bf559df4954f705d06a44f1354 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 8 Sep 2016 10:16:53 -0400 Subject: [PATCH] Bug 6540: EOS - Prune pending owner change commits on leader change When the shard leader is isolated, it attempts to re-assign ownership for down peers. However, since it's isolated, it can't commit the modifications. If the majority partition elects a new leader, when the partition is healed, the old leader tries to forward the pending owner change commits to the new leader. However this is problematic as the criteria used to determine the new owner is stale and owner changes should only be committed by a valid leader. Since the old leader is no longer the leader, it should not forward pending owner change commits. However it still should forward local candidate change commits. So I modified EntityOwnershipShardCommitCoordinator#onStateChange to iterate the pending Modifications and remove WRITE modifications for the owner leaf when the shard has transitioned to having a remote leader. I also fixed an issue in EntityOwnershipShard#onCandidateRemoved that was intermittently revealed by unit tests. Say candidate1 and candidate2 are removed quickly for an entity and candidate1 is the current owner. onCandidateRemoved is called for candidate1 and commits an update to write candidate2 as the owner. If the write commit is still pending when onCandidateRemoved is called for candidate2, the current owner will still be candidate1 and the "message.getRemovedCandidate().equals(currentOwner)" check will fail and thus the owner isn't cleared and candidate2 will remain as owner. This results in a node being the owner w/o being in the candidate list. (This patch may fix Bug 6672 as well) A new testLeaderIsolation case was added to EntityOwnershipShardTest. Also I reworked the tests and removed the use of the MockFollower and MockLeader actors for consistency and also so the tests use the real EOS shard. Change-Id: I5039b07d02f8571ee2d1affb0f364ea278641e91 Signed-off-by: Tom Pantelis --- .../controller/cluster/datastore/Shard.java | 12 +- .../entityownership/EntityOwnershipShard.java | 14 +- ...EntityOwnershipShardCommitCoordinator.java | 66 ++ .../AbstractEntityOwnershipTest.java | 102 +- .../EntityOwnershipShardTest.java | 1012 ++++++++++------- 5 files changed, 765 insertions(+), 441 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c3f76530bc..a1e506f6ec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -610,8 +610,7 @@ public class Shard extends RaftActor { // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); if (leader != null) { - Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( - datastoreContext.getShardBatchedModificationCount()); + Collection messagesToForward = convertPendingTransactionsToMessages(); if (!messagesToForward.isEmpty()) { LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(), @@ -633,6 +632,15 @@ public class Shard extends RaftActor { } } + /** + * Clears all pending transactions and converts them to messages to be forwarded to a new leader. + * + * @return the converted messages + */ + public Collection convertPendingTransactionsToMessages() { + return commitCoordinator.convertPendingTransactionsToMessages(datastoreContext.getShardBatchedModificationCount()); + } + @Override protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index f1e898ae23..99ef94ef50 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -322,8 +322,6 @@ class EntityOwnershipShard extends Shard { @Override protected void onStateChanged() { - super.onStateChanged(); - boolean isLeader = isLeader(); LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader()); @@ -337,12 +335,12 @@ class EntityOwnershipShard extends Shard { } commitCoordinator.onStateChanged(this, isLeader); + + super.onStateChanged(); } @Override protected void onLeaderChanged(String oldLeader, String newLeader) { - super.onLeaderChanged(oldLeader, newLeader); - boolean isLeader = isLeader(); LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader, newLeader, isLeader); @@ -366,6 +364,8 @@ class EntityOwnershipShard extends Shard { // leader and stays in the follower state. In that case no behavior state change occurs. commitCoordinator.onStateChanged(this, isLeader); } + + super.onLeaderChanged(oldLeader, newLeader); } private void onCandidateRemoved(CandidateRemoved message) { @@ -373,10 +373,8 @@ class EntityOwnershipShard extends Shard { if(isLeader()) { String currentOwner = getCurrentOwner(message.getEntityPath()); - if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().isEmpty()){ - writeNewOwner(message.getEntityPath(), - newOwner(currentOwner, message.getRemainingCandidates(), getEntityOwnerElectionStrategy(message.getEntityPath()))); - } + writeNewOwner(message.getEntityPath(), + newOwner(currentOwner, message.getRemainingCandidates(), getEntityOwnerElectionStrategy(message.getEntityPath()))); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 56ecd52276..c792cf1dda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; + import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status.Failure; @@ -14,6 +16,7 @@ import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; @@ -25,6 +28,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -181,6 +185,8 @@ class EntityOwnershipShardCommitCoordinator { void onStateChanged(EntityOwnershipShard shard, boolean isLeader) { shard.possiblyRemoveAllInitialCandidates(shard.getLeader()); + possiblyPrunePendingCommits(shard, isLeader); + if(!isLeader && inflightCommit != null) { // We're no longer the leader but we have an inflight local commit. This likely means we didn't get // consensus for the commit and switched to follower due to another node with a higher term. We @@ -197,6 +203,66 @@ class EntityOwnershipShardCommitCoordinator { } } + private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) { + // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader. + // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the + // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not + // forward such commits thus we prune the pending modifications. We still should forward local candidate change + // commits. + if (shard.hasLeader() && !isLeader) { + // We may have already submitted a transaction for replication and commit. We don't need the base Shard to + // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear + // pending transactions and drop them. + shard.convertPendingTransactionsToMessages(); + + // Prune the inflightCommit. + if(inflightCommit != null) { + inflightCommit = pruneModifications(inflightCommit); + } + + // Prune the subsequent pending modifications. + Iterator iter = pendingModifications.iterator(); + while(iter.hasNext()) { + Modification mod = iter.next(); + if(!canForwardModificationToNewLeader(mod)) { + iter.remove(); + } + } + } + } + + @Nullable + private BatchedModifications pruneModifications(BatchedModifications toPrune) { + BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionID(), toPrune.getVersion()); + prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady()); + prunedModifications.setReady(toPrune.isReady()); + prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent()); + for(Modification mod: toPrune.getModifications()) { + if(canForwardModificationToNewLeader(mod)) { + prunedModifications.addModification(mod); + } + } + + return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null; + } + + private boolean canForwardModificationToNewLeader(Modification mod) { + // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used + // to determine the new owner might be stale. + if (mod instanceof WriteModification) { + WriteModification writeMod = (WriteModification)mod; + boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME); + + if (!canForward) { + log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath()); + } + + return canForward; + } + + return true; + } + private void newInflightCommitWithDifferentTransactionID() { BatchedModifications newBatchedModifications = newBatchedModifications(); newBatchedModifications.getModifications().addAll(inflightCommit.getModifications()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java index f93ac28eeb..23b5563a32 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java @@ -17,18 +17,30 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath; + +import akka.pattern.Patterns; +import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.hamcrest.Description; import org.junit.Assert; import org.mockito.ArgumentMatcher; import org.mockito.Matchers; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.ShardDataTree; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; @@ -48,6 +60,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * Abstract base class providing utility methods. @@ -55,6 +73,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed * @author Thomas Pantelis */ public class AbstractEntityOwnershipTest extends AbstractActorTest { + protected final Logger testLog = LoggerFactory.getLogger(getClass()); + + private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger(); + protected void verifyEntityCandidate(NormalizedNode node, String entityType, YangInstanceIdentifier entityId, String candidateName, boolean expectPresent) { try { @@ -175,18 +197,24 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest { static DOMEntityOwnershipChange ownershipChange(final DOMEntity expEntity, final boolean expWasOwner, final boolean expIsOwner, final boolean expHasOwner) { + return ownershipChange(expEntity, expWasOwner, expIsOwner, expHasOwner, false); + } + + static DOMEntityOwnershipChange ownershipChange(final DOMEntity expEntity, final boolean expWasOwner, + final boolean expIsOwner, final boolean expHasOwner, final boolean expInJeopardy) { return Matchers.argThat(new ArgumentMatcher() { @Override public boolean matches(Object argument) { DOMEntityOwnershipChange change = (DOMEntityOwnershipChange) argument; return expEntity.equals(change.getEntity()) && expWasOwner == change.getState().wasOwner() && - expIsOwner == change.getState().isOwner() && expHasOwner == change.getState().hasOwner(); + expIsOwner == change.getState().isOwner() && expHasOwner == change.getState().hasOwner() && + expInJeopardy == change.inJeopardy(); } @Override public void describeTo(Description description) { description.appendValue(new DOMEntityOwnershipChange(expEntity, EntityOwnershipChangeState.from( - expWasOwner, expIsOwner, expHasOwner))); + expWasOwner, expIsOwner, expHasOwner), expInJeopardy)); } }); } @@ -206,4 +234,74 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest { } }); } + + static void verifyOwner(final TestActorRef shard, String entityType, + YangInstanceIdentifier entityId, String localMemberName) { + verifyOwner(localMemberName, entityType, entityId, path -> { + try { + return AbstractShardTest.readStore(shard, path); + } catch(Exception e) { + return null; + } + }); + } + + static void verifyRaftState(final TestActorRef shard, Consumer verifier) + throws Exception { + AssertionError lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); + Future future = Patterns.ask(shard, GetOnDemandRaftState.INSTANCE, new Timeout(operationDuration)); + OnDemandRaftState raftState = (OnDemandRaftState)Await.result(future, operationDuration); + try { + verifier.accept(raftState); + return; + } catch (AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } + + static ShardIdentifier newShardId(String memberName) { + return ShardIdentifier.create("entity-ownership", MemberName.forName(memberName), + "operational" + NEXT_SHARD_NUM.getAndIncrement()); + } + + void verifyEntityCandidateRemoved(final TestActorRef shard, String entityType, + YangInstanceIdentifier entityId, String candidateName) { + verifyNodeRemoved(candidatePath(entityType, entityId, candidateName), + path -> { + try { + return AbstractShardTest.readStore(shard, path); + } catch(Exception e) { + throw new AssertionError("Failed to read " + path, e); + } + }); + } + + void verifyCommittedEntityCandidate(final TestActorRef shard, String entityType, + YangInstanceIdentifier entityId, String candidateName) { + verifyEntityCandidate(entityType, entityId, candidateName, path -> { + try { + return AbstractShardTest.readStore(shard, path); + } catch(Exception e) { + throw new AssertionError("Failed to read " + path, e); + } + }); + } + + void verifyNoEntityCandidate(final TestActorRef shard, String entityType, + YangInstanceIdentifier entityId, String candidateName) { + verifyEntityCandidate(entityType, entityId, candidateName, path -> { + try { + return AbstractShardTest.readStore(shard, path); + } catch(Exception e) { + throw new AssertionError("Failed to read " + path, e); + } + }, false); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 13fb8dc032..9660a2a392 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -16,14 +16,11 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate; +import static org.mockito.Mockito.verifyNoMoreInteractions; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; -import akka.actor.UntypedActor; import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; @@ -33,17 +30,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.AbstractShardTest; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; -import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; @@ -53,28 +47,23 @@ import org.opendaylight.controller.cluster.datastore.entityownership.selectionst import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PeerDown; import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; -import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; -import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** @@ -95,10 +84,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { private static final YangInstanceIdentifier ENTITY_ID5 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5")); private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners(); - private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger(); - private static final String LOCAL_MEMBER_NAME = "member-1"; + private static final String LOCAL_MEMBER_NAME = "local-member-1"; + private static final String PEER_MEMBER_1_NAME = "peer-member-1"; + private static final String PEER_MEMBER_2_NAME = "peer-member-2"; - private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder(); + private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false); private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); @After @@ -108,9 +98,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { @Test public void testOnRegisterCandidateLocal() throws Exception { + testLog.info("testOnRegisterCandidateLocal starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); + TestActorRef shard = actorFactory.createTestActor(newLocalShardProps()); ShardTestKit.waitUntilLeader(shard); @@ -122,21 +114,29 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocal ending"); } @Test public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception { + testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); + + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + TestEntityOwnershipShard peerShard = peer.underlyingActor(); + peerShard.startDroppingMessagesOfType(RequestVote.class); + peerShard.startDroppingMessagesOfType(ElectionTimeout.class); - TestActorRef shard = actorFactory.createTestActor(newShardProps( - ImmutableMap.builder().put(peerId, peer.path().toString()).build()). - withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef shard = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString()); YangInstanceIdentifier entityId = ENTITY_ID1; DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); @@ -144,39 +144,44 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - // Now grant the vote so the shard becomes the leader. This should retry the commit. - peer.underlyingActor().grantVote = true; + // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit. + peerShard.stopDroppingMessagesOfType(RequestVote.class); verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending"); } @Test public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception { + testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). shardTransactionCommitTimeoutInSeconds(1); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); - MockFollower follower = peer.underlyingActor(); + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + TestEntityOwnershipShard peerShard = peer.underlyingActor(); + peerShard.startDroppingMessagesOfType(ElectionTimeout.class); // Drop AppendEntries so consensus isn't reached. - follower.dropAppendEntries = true; + peerShard.startDroppingMessagesOfType(AppendEntries.class); - TestActorRef shard = actorFactory.createTestActor(newShardProps( - ImmutableMap.builder().put(peerId, peer.path().toString()).build()). - withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString()); - ShardTestKit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(leader); YangInstanceIdentifier entityId = ENTITY_ID1; DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Wait enough time for the commit to timeout. @@ -184,121 +189,117 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Resume AppendEntries - the follower should ack the commit which should then result in the candidate // write being applied to the state. - follower.dropAppendEntries = false; + peerShard.stopDroppingMessagesOfType(AppendEntries.class); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending"); } @Test public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception { + testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). shardIsolatedLeaderCheckIntervalInMillis(50); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); - MockFollower follower = peer.underlyingActor(); + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + TestEntityOwnershipShard peerShard = peer.underlyingActor(); + peerShard.startDroppingMessagesOfType(ElectionTimeout.class); - TestActorRef shard = actorFactory.createTestActor(newShardProps( - ImmutableMap.builder().put(peerId, peer.path().toString()).build()). - withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME)); - ShardTestKit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(leader); // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader. - follower.dropAppendEntries = true; - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + peerShard.startDroppingMessagesOfType(AppendEntries.class); + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState())); YangInstanceIdentifier entityId = ENTITY_ID1; DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Resume AppendEntries - the candidate write should now be committed. - follower.dropAppendEntries = false; - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + peerShard.stopDroppingMessagesOfType(AppendEntries.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending"); } @Test public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception { + testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). shardBatchedModificationCount(5); - String peerId = newShardId("leader").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockLeader.class). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); - - TestActorRef shard = actorFactory.createTestActor(Props.create( - TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME), - ImmutableMap.builder().put(peerId, peer.path().toString()).build(), - dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME); + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); + TestEntityOwnershipShard leaderShard = leader.underlyingActor(); - shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.emptyList(), -1L, -1L, - DataStoreVersions.CURRENT_VERSION), peer); + TestActorRef local = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString()); + local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - shard.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - MockLeader leader = peer.underlyingActor(); - assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( - leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1, - LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); // Test with initial commit timeout and subsequent retry. - leader.modificationsReceived = new CountDownLatch(1); - leader.sendReply = false; + local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender()); + leaderShard.startDroppingMessagesOfType(BatchedModifications.class); - shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender()); - - shard.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( - leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2, - LOCAL_MEMBER_NAME); + MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); // Send a bunch of registration messages quickly and verify. + leaderShard.stopDroppingMessagesOfType(BatchedModifications.class); + MessageCollectorActor.clearMessages(leaderShard.collectorActor()); + int max = 100; - leader.delay = 4; - leader.modificationsReceived = new CountDownLatch(max); List entityIds = new ArrayList<>(); for(int i = 1; i <= max; i++) { YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i)); entityIds.add(id); - shard.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef()); + local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef()); } - assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( - leader.modificationsReceived, 10, TimeUnit.SECONDS)); - - // Sleep a little to ensure no additional BatchedModifications are received. - - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - - List receivedMods = leader.getAndClearReceivedModifications(); for(int i = 0; i < max; i++) { - verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); } - assertEquals("# modifications received", max, receivedMods.size()); + testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending"); } @Test public void testOnUnregisterCandidateLocal() throws Exception { + testLog.info("testOnUnregisterCandidateLocal starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); + TestActorRef shard = actorFactory.createTestActor(newLocalShardProps()); ShardTestKit.waitUntilLeader(shard); DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); @@ -325,104 +326,132 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + + testLog.info("testOnUnregisterCandidateLocal ending"); } @Test public void testOwnershipChanges() throws Exception { + testLog.info("testOwnershipChanges starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); - ShardTestKit.waitUntilLeader(shard); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); // Add a remote candidate - String remoteMemberName1 = "remoteMember1"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + peer1.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Register local - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Verify the remote candidate becomes owner - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Add another remote candidate and verify ownership doesn't change - String remoteMemberName2 = "remoteMember2"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree); + peer2.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Remove the second remote candidate and verify ownership doesn't change - deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree); + peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Remove the first remote candidate and verify the local candidate becomes owner - deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); // Add the second remote candidate back and verify ownership doesn't change - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree); + peer2.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); // Unregister the local candidate and verify the second remote candidate becomes owner - shard.tell(new UnregisterCandidateLocal(entity), kit.getRef()); + leader.tell(new UnregisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); + + testLog.info("testOwnershipChanges ending"); } @Test public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); + testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting"); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000); + ShardTestKit kit = new ShardTestKit(getSystem()); - String peerMemberName1 = "peerMember1"; - String peerMemberName2 = "peerMember2"; + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). + shardIsolatedLeaderCheckIntervalInMillis(100000); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); - ShardIdentifier peerId1 = newShardId(peerMemberName1); - ShardIdentifier peerId2 = newShardId(peerMemberName2); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); - TestActorRef peer1 = actorFactory.createTestActor(newShardProps(peerId1, - ImmutableMap.builder().put(leaderId.toString(), actorFactory.createTestActorPath(leaderId.toString())). - put(peerId2.toString(), actorFactory.createTestActorPath(peerId2.toString())).build(), peerMemberName1, - EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString()); + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - TestActorRef peer2 = actorFactory.createTestActor(newShardProps(peerId2, - ImmutableMap.builder().put(leaderId.toString(), actorFactory.createTestActorPath(leaderId.toString())). - put(peerId1.toString(), peer1.path().toString()).build(), peerMemberName2, - EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString()); + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - TestActorRef leader = actorFactory.createTestActor(newShardProps(leaderId, - ImmutableMap.builder().put(peerId1.toString(), peer1.path().toString()). - put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()). - withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString()); - leader.tell(TimeoutNow.INSTANCE, leader); + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + leaderId.toString()); - ShardTestKit.waitUntilLeader(leader); + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); // Send PeerDown and PeerUp with no entities @@ -435,46 +464,54 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { kit.expectMsgClass(SuccessReply.class); verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); // Add candidates for entity2 with peerMember2 as the owner - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); // Add candidates for entity3 with peerMember2 as the owner. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); // Add only candidate peerMember2 for entity4. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); // Add only candidate peerMember1 for entity5. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME); // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new // owner selected @@ -490,85 +527,101 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); // no other candidates for entity4 so peerMember2 should remain owner. - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); // Reinstate peerMember2 - peer2 = actorFactory.createTestActor(newShardProps(peerId2, - ImmutableMap.builder().put(leaderId.toString(), leader.path().toString()). - put(peerId1.toString(), peer1.path().toString()).build(), peerMemberName2, - EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString()); + peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); // Send PeerUp again - should be noop leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); // peerMember2's candidates should be removed on startup. - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // Add back candidate peerMember2 for entities 1, 2, & 3. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected + kit.watch(peer1); peer1.tell(PoisonPill.getInstance(), ActorRef.noSender()); + kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); + kit.unwatch(peer1); leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); // Verify the reinstated peerMember2 is fully synced. verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); // Reinstate peerMember1 and verify no owner changes - peer1 = actorFactory.createTestActor(newShardProps(peerId1, - ImmutableMap.builder().put(leaderId.toString(), leader.path().toString()). - put(peerId2.toString(), peer2.path().toString()).build(), peerMemberName1, - EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString()); + peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder( + peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME); + + verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME); // Verify the reinstated peerMember1 is fully synced. verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, ""); @@ -579,24 +632,226 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + kit.watch(leader); leader.tell(PoisonPill.getInstance(), ActorRef.noSender()); + kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); + kit.unwatch(leader); peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); peer2.tell(TimeoutNow.INSTANCE, peer2); - ShardTestKit.waitUntilLeader(peer2); + verifyRaftState(peer2, state -> + assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); + + testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending"); + } + + @Test + public void testLeaderIsolation() throws Exception { + testLog.info("testLeaderIsolation starting"); + + ShardTestKit kit = new ShardTestKit(getSystem()); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). + shardIsolatedLeaderCheckIntervalInMillis(100000); + + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()). + shardIsolatedLeaderCheckIntervalInMillis(500); + + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)), + leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); + + // Add entity1 candidates for all members with the leader as the owner + + DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); + + peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME); + + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + // Add entity2 candidates for all members with peer1 as the owner + + DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + + peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME); + + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + + verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + + // Add entity3 candidates for all members with peer2 as the owner + + DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3); + peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + + leader.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME); + + peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME); + + verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + + // Add listeners on all members + + DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class); + leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true), + ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true))); + reset(leaderListener); + + DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class); + peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true), + ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true))); + reset(peer1Listener); + + DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class); + peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true), + ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true))); + reset(peer2Listener); + + // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. + + leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, + ae -> ae.getLeaderId().equals(leaderId.toString())); + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + // Make peer1 start an election and become leader by enabling the ElectionTimeout message. + + peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class); + + // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the + // isolated peers. + + leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); + + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState())); + + // Expect inJeopardy notification on the isolated leader. + + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true), + ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true))); + reset(leaderListener); + + verifyRaftState(peer1, state -> + assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); + + // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the + // isolated leader. + + peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); + + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true)); + reset(peer1Listener); + + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + reset(peer2Listener); + + // Remove the isolation. + + leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + + // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities. + + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState())); + + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true), + ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true))); + + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); + verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true)); + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + + verifyNoMoreInteractions(leaderListener); + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + testLog.info("testLeaderIsolation ending"); } @Test public void testListenerRegistration() throws Exception { + testLog.info("testListenerRegistration starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); - ShardTestKit.waitUntilLeader(shard); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); + + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); String otherEntityType = "otherEntityType"; DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); @@ -607,17 +862,17 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Register listener - shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Register a couple candidates for the desired entity type and verify listener is notified. - shard.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true)); - shard.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true)); @@ -625,7 +880,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Register another candidate for another entity type and verify listener is not notified. - shard.tell(new RegisterCandidateLocal(entity4), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity4), kit.getRef()); kit.expectMsgClass(SuccessReply.class); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); @@ -633,14 +888,13 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Register remote candidate for entity1 - String remoteMemberName = "remoteMember"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getIdentifier(), remoteMemberName), - shardDataTree); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getIdentifier(), remoteMemberName); + peer.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME); // Unregister the local candidate for entity1 and verify listener is notified - shard.tell(new UnregisterCandidateLocal(entity1), kit.getRef()); + leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true)); @@ -648,13 +902,13 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Unregister the listener, add a candidate for entity3 and verify listener isn't notified - shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - shard.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity3), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verifyOwner(shard, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class)); @@ -662,7 +916,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { reset(listener); - shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true), @@ -670,278 +924,178 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); verify(listener, never()).ownershipChanged(ownershipChange(entity4)); verify(listener, times(1)).ownershipChanged(ownershipChange(entity1)); - } - - private static void commitModification(TestActorRef shard, NormalizedNode node, - JavaTestKit sender) { - BatchedModifications modifications = newBatchedModifications(); - modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node)); - - shard.tell(modifications, sender.getRef()); - sender.expectMsgClass(CommitTransactionReply.class); - } - private static BatchedModifications newBatchedModifications() { - BatchedModifications modifications = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION); - modifications.setDoCommitOnReady(true); - modifications.setReady(true); - modifications.setTotalMessagesSent(1); - return modifications; + testLog.info("testListenerRegistration ending"); } - private void verifyEntityCandidateRemoved(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String candidateName) { - verifyNodeRemoved(candidatePath(entityType, entityId, candidateName), - path -> { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - throw new AssertionError("Failed to read " + path, e); - } - }); - } + @Test + public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception { + testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting"); - private void verifyCommittedEntityCandidate(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String candidateName) { - verifyEntityCandidate(entityType, entityId, candidateName, path -> { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - throw new AssertionError("Failed to read " + path, e); - } - }); - } + ShardTestKit kit = new ShardTestKit(getSystem()); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder(). + addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); - private void verifyNoEntityCandidate(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String candidateName) { - verifyEntityCandidate(entityType, entityId, candidateName, path -> { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - throw new AssertionError("Failed to read " + path, e); - } - }, false); - } + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); - private void verifyBatchedEntityCandidate(List mods, String entityType, - YangInstanceIdentifier entityId, String candidateName) throws Exception { - assertEquals("BatchedModifications size", 1, mods.size()); - verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName); - } + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - private void verifyBatchedEntityCandidate(Modification mod, String entityType, - YangInstanceIdentifier entityId, String candidateName) throws Exception { - assertEquals("Modification type", MergeModification.class, mod.getClass()); - verifyEntityCandidate(((MergeModification)mod).getData(), entityType, - entityId, candidateName, true); - } + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString()); - private static void verifyOwner(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String localMemberName) { - verifyOwner(localMemberName, entityType, entityId, path -> { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - return null; - } - }); - } + ShardTestKit.waitUntilLeader(leader); - private Props newShardProps() { - return newShardProps(Collections.emptyMap()); - } + DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); - private Props newShardProps(EntityOwnerSelectionStrategyConfig strategyConfig, Map peers) { - return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, strategyConfig); - } + // Add a remote candidate - private Props newShardProps(Map peers) { - return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()); - } + peer.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName, - EntityOwnerSelectionStrategyConfig config) { - return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers). - datastoreContext(dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT). - localMemberName(MemberName.forName(memberName)).ownerSelectionStrategyConfig(config).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()); - } + // Register local - private static ShardIdentifier newShardId(String memberName) { - return ShardIdentifier.create("entity-ownership", MemberName.forName(memberName), - "operational" + NEXT_SHARD_NUM.getAndIncrement()); - } + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - private static class TestEntityOwnershipShard extends EntityOwnershipShard { + // Verify the local candidate becomes owner - TestEntityOwnershipShard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext) { - super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext). - schemaContext(SCHEMA_CONTEXT).localMemberName(name.getMemberName())); - } + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); - @Override - public void handleCommand(Object message) { - if(!(message instanceof ElectionTimeout)) { - super.handleCommand(message); - } - } + testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending"); } - private static class MockFollower extends UntypedActor { - volatile boolean grantVote; - volatile boolean dropAppendEntries; - private final String myId; - - @SuppressWarnings("unused") - public MockFollower(String myId) { - this(myId, true); - } + @Test + public void testDelayedEntityOwnerSelection() throws Exception { + testLog.info("testDelayedEntityOwnerSelection starting"); - public MockFollower(String myId, boolean grantVote) { - this.myId = myId; - this.grantVote = grantVote; - } + ShardTestKit kit = new ShardTestKit(getSystem()); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder(). + addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); - @Override - public void onReceive(Object message) { - if(message instanceof RequestVote) { - if(grantVote) { - getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf()); - } - } else if(message instanceof AppendEntries) { - if(!dropAppendEntries) { - AppendEntries req = (AppendEntries) message; - long lastIndex = req.getLeaderCommit(); - if (req.getEntries().size() > 0) { - for(ReplicatedLogEntry entry : req.getEntries()) { - lastIndex = entry.getIndex(); - } - } - - getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(), - DataStoreVersions.CURRENT_VERSION), getSelf()); - } - } - } - } + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); - @Test - public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder - = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - peer.underlyingActor().grantVote = true; + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()), + leaderId.toString()); - TestActorRef shard = actorFactory.createTestActor(newShardProps(builder.build(), - ImmutableMap.of(peerId.toString(), peer.path().toString()))); - ShardTestKit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(leader); DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); // Add a remote candidate - String remoteMemberName1 = "follower"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + peer1.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); // Register local - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Verify the local candidate becomes owner - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - } + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); - @Test - public void testDelayedEntityOwnerSelection() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder - = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + testLog.info("testDelayedEntityOwnerSelection ending"); + } - String follower1Id = newShardId("follower1").toString(); - TestActorRef follower1 = actorFactory.createTestActor(Props.create(MockFollower.class, follower1Id, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), follower1Id); + private Props newLocalShardProps() { + return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.emptyMap(), LOCAL_MEMBER_NAME); + } - follower1.underlyingActor().grantVote = true; + private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName) { + return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build()); + } - String follower2Id = newShardId("follower").toString(); - TestActorRef follower2 = actorFactory.createTestActor(Props.create(MockFollower.class, follower2Id, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), follower2Id); + private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName, + EntityOwnerSelectionStrategyConfig config) { + return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()); + } - follower2.underlyingActor().grantVote = true; + private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map peers, + String memberName) { + return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext( + dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName( + MemberName.forName(memberName)).ownerSelectionStrategyConfig( + EntityOwnerSelectionStrategyConfig.newBuilder().build()); + } + private Map peerMap(String... peerIds) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for(String peerId: peerIds) { + builder.put(peerId, actorFactory.createTestActorPath(peerId)).build(); + } - TestActorRef shard = actorFactory.createTestActor(newShardProps(builder.build(), - ImmutableMap.of(follower1Id.toString(), follower2.path().toString(), follower2Id.toString(), follower2.path().toString()))); - ShardTestKit.waitUntilLeader(shard); + return builder.build(); + } - DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + private static class TestEntityOwnershipShard extends EntityOwnershipShard { + private final TestActorRef collectorActor; + private final Map, Predicate> dropMessagesOfType = new ConcurrentHashMap<>(); - // Add a remote candidate + TestEntityOwnershipShard(Builder builder, TestActorRef collectorActor) { + super(builder); + this.collectorActor = collectorActor; + } - String remoteMemberName1 = "follower"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void handleCommand(Object message) { + if(collectorActor != null) { + collectorActor.tell(message, ActorRef.noSender()); + } - // Register local + Predicate drop = dropMessagesOfType.get(message.getClass()); + if(drop == null || !drop.test(message)) { + super.handleCommand(message); + } + } - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); - kit.expectMsgClass(SuccessReply.class); + void startDroppingMessagesOfType(Class msgClass) { + dropMessagesOfType.put(msgClass, msg -> true); + } - // Verify the local candidate becomes owner + void startDroppingMessagesOfType(Class msgClass, Predicate filter) { + dropMessagesOfType.put(msgClass, filter); + } - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - } + void stopDroppingMessagesOfType(Class msgClass) { + dropMessagesOfType.remove(msgClass); + } - private static class MockLeader extends UntypedActor { - volatile CountDownLatch modificationsReceived = new CountDownLatch(1); - List receivedModifications = new ArrayList<>(); - volatile boolean sendReply = true; - volatile long delay; + TestActorRef collectorActor() { + return collectorActor; + } - @Override - public void onReceive(Object message) { - if(message instanceof BatchedModifications) { - if(delay > 0) { - Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); - } - - if(sendReply) { - BatchedModifications mods = (BatchedModifications) message; - synchronized (receivedModifications) { - for(int i = 0; i < mods.getModifications().size(); i++) { - receivedModifications.add(mods.getModifications().get(i)); - modificationsReceived.countDown(); - } - } - - getSender().tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION). - toSerializable(), getSelf()); - } else { - sendReply = true; - } - } + static Props props(Builder builder) { + return props(builder, null); } - List getAndClearReceivedModifications() { - synchronized (receivedModifications) { - List ret = new ArrayList<>(receivedModifications); - receivedModifications.clear(); - return ret; - } + static Props props(Builder builder, TestActorRef collectorActor) { + return Props.create(TestEntityOwnershipShard.class, builder, collectorActor). + withDispatcher(Dispatchers.DefaultDispatcherId()); } } } -- 2.36.6