X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardTest.java;h=276f11f93605e636d692ef88382b2e139af2331d;hp=26811444d843c11220c03fdc99dcbc2b1ff960c6;hb=1b0f84c4957e464bad6f7cb7350a8171c3d1621b;hpb=dc6370feeb5fc47be3e267bf85d6354013d0409b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 26811444d8..276f11f936 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -16,36 +16,35 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath; -import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; + import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; -import akka.actor.UntypedActor; import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.AbstractShardTest; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; -import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.ShardTestKit; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; @@ -54,27 +53,23 @@ import org.opendaylight.controller.cluster.datastore.entityownership.selectionst import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PeerDown; import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; -import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; -import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; -import org.opendaylight.controller.md.sal.common.api.clustering.Entity; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** @@ -95,10 +90,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { private static final YangInstanceIdentifier ENTITY_ID5 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5")); private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners(); - private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger(); - private static final String LOCAL_MEMBER_NAME = "member-1"; + private static final String LOCAL_MEMBER_NAME = "local-member-1"; + private static final String PEER_MEMBER_1_NAME = "peer-member-1"; + private static final String PEER_MEMBER_2_NAME = "peer-member-2"; - private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder(); + private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false); private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); @After @@ -108,75 +104,90 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { @Test public void testOnRegisterCandidateLocal() throws Exception { + testLog.info("testOnRegisterCandidateLocal starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); + TestActorRef shard = actorFactory.createTestActor(newLocalShardProps()); ShardTestKit.waitUntilLeader(shard); YangInstanceIdentifier entityId = ENTITY_ID1; - Entity entity = new Entity(ENTITY_TYPE, entityId); + DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocal ending"); } @Test public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); + testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting"); + + final ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); - TestActorRef shard = actorFactory.createTestActor(newShardProps( - ImmutableMap.builder().put(peerId, peer.path().toString()).build()). - withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + TestEntityOwnershipShard peerShard = peer.underlyingActor(); + peerShard.startDroppingMessagesOfType(RequestVote.class); + peerShard.startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef shard = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString()); YangInstanceIdentifier entityId = ENTITY_ID1; - Entity entity = new Entity(ENTITY_TYPE, entityId); + DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - // Now grant the vote so the shard becomes the leader. This should retry the commit. - peer.underlyingActor().grantVote = true; + // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit. + peerShard.stopDroppingMessagesOfType(RequestVote.class); verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending"); } @Test public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); + testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting"); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardTransactionCommitTimeoutInSeconds(1); + final ShardTestKit kit = new ShardTestKit(getSystem()); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardTransactionCommitTimeoutInSeconds(1); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); - MockFollower follower = peer.underlyingActor(); + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + TestEntityOwnershipShard peerShard = peer.underlyingActor(); + peerShard.startDroppingMessagesOfType(ElectionTimeout.class); // Drop AppendEntries so consensus isn't reached. - follower.dropAppendEntries = true; + peerShard.startDroppingMessagesOfType(AppendEntries.class); - TestActorRef shard = actorFactory.createTestActor(newShardProps( - ImmutableMap.builder().put(peerId, peer.path().toString()).build()). - withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString()); - ShardTestKit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(leader); YangInstanceIdentifier entityId = ENTITY_ID1; - Entity entity = new Entity(ENTITY_TYPE, entityId); + DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Wait enough time for the commit to timeout. @@ -184,124 +195,120 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Resume AppendEntries - the follower should ack the commit which should then result in the candidate // write being applied to the state. - follower.dropAppendEntries = false; + peerShard.stopDroppingMessagesOfType(AppendEntries.class); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending"); } @Test public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); + testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting"); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardIsolatedLeaderCheckIntervalInMillis(50); + final ShardTestKit kit = new ShardTestKit(getSystem()); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardIsolatedLeaderCheckIntervalInMillis(50); - MockFollower follower = peer.underlyingActor(); + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); - TestActorRef shard = actorFactory.createTestActor(newShardProps( - ImmutableMap.builder().put(peerId, peer.path().toString()).build()). - withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + TestEntityOwnershipShard peerShard = peer.underlyingActor(); + peerShard.startDroppingMessagesOfType(ElectionTimeout.class); - ShardTestKit.waitUntilLeader(shard); + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME)); + + ShardTestKit.waitUntilLeader(leader); // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader. - follower.dropAppendEntries = true; - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + peerShard.startDroppingMessagesOfType(AppendEntries.class); + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState())); YangInstanceIdentifier entityId = ENTITY_ID1; - Entity entity = new Entity(ENTITY_TYPE, entityId); + DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Resume AppendEntries - the candidate write should now be committed. - follower.dropAppendEntries = false; - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + peerShard.stopDroppingMessagesOfType(AppendEntries.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending"); } @Test public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); + testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting"); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardBatchedModificationCount(5); + ShardTestKit kit = new ShardTestKit(getSystem()); - String peerId = newShardId("leader").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockLeader.class). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardBatchedModificationCount(5); - TestActorRef shard = actorFactory.createTestActor(Props.create( - TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME), - ImmutableMap.builder().put(peerId, peer.path().toString()).build(), - dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME); + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); + final TestEntityOwnershipShard leaderShard = leader.underlyingActor(); - shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.emptyList(), -1L, -1L, - DataStoreVersions.CURRENT_VERSION), peer); + TestActorRef local = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString()); + local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - MockLeader leader = peer.underlyingActor(); - assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( - leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1, - LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); // Test with initial commit timeout and subsequent retry. - leader.modificationsReceived = new CountDownLatch(1); - leader.sendReply = false; + local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender()); + leaderShard.startDroppingMessagesOfType(BatchedModifications.class); - shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender()); - - shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( - leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2, - LOCAL_MEMBER_NAME); + expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); // Send a bunch of registration messages quickly and verify. + leaderShard.stopDroppingMessagesOfType(BatchedModifications.class); + clearMessages(leaderShard.collectorActor()); + int max = 100; - leader.delay = 4; - leader.modificationsReceived = new CountDownLatch(max); List entityIds = new ArrayList<>(); - for(int i = 1; i <= max; i++) { + for (int i = 1; i <= max; i++) { YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i)); entityIds.add(id); - shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, id)), kit.getRef()); + local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef()); } - assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( - leader.modificationsReceived, 10, TimeUnit.SECONDS)); - - // Sleep a little to ensure no additional BatchedModifications are received. - - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - - List receivedMods = leader.getAndClearReceivedModifications(); - for(int i = 0; i < max; i++) { - verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); + for (int i = 0; i < max; i++) { + verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); } - assertEquals("# modifications received", max, receivedMods.size()); + testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending"); } @Test public void testOnUnregisterCandidateLocal() throws Exception { + testLog.info("testOnUnregisterCandidateLocal starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); + TestActorRef shard = actorFactory.createTestActor(newLocalShardProps()); ShardTestKit.waitUntilLeader(shard); - Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1); + DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); // Register @@ -325,102 +332,132 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + + testLog.info("testOnUnregisterCandidateLocal ending"); } @Test public void testOwnershipChanges() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); - ShardTestKit.waitUntilLeader(shard); + testLog.info("testOwnershipChanges starting"); - Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + final ShardTestKit kit = new ShardTestKit(getSystem()); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); + + DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); // Add a remote candidate - String remoteMemberName1 = "remoteMember1"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + peer1.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Register local - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Verify the remote candidate becomes owner - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Add another remote candidate and verify ownership doesn't change - String remoteMemberName2 = "remoteMember2"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree); + peer2.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Remove the second remote candidate and verify ownership doesn't change - deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree); + peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); // Remove the first remote candidate and verify the local candidate becomes owner - deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); // Add the second remote candidate back and verify ownership doesn't change - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree); + peer2.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); // Unregister the local candidate and verify the second remote candidate becomes owner - shard.tell(new UnregisterCandidateLocal(entity), kit.getRef()); + leader.tell(new UnregisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2); + verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME); + + testLog.info("testOwnershipChanges ending"); } @Test public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); + testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting"); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000); + final ShardTestKit kit = new ShardTestKit(getSystem()); - String peerMemberName1 = "peerMember1"; - String peerMemberName2 = "peerMember2"; + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); - ShardIdentifier peerId1 = newShardId(peerMemberName1); - ShardIdentifier peerId2 = newShardId(peerMemberName2); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); - TestActorRef peer1 = actorFactory.createTestActor(newShardProps(peerId1, - ImmutableMap.builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(), - peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString()); + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - TestActorRef peer2 = actorFactory.createTestActor(newShardProps(peerId2, - ImmutableMap.builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(), - peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString()); + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - TestActorRef leader = actorFactory.createTestActor(newShardProps(leaderId, - ImmutableMap.builder().put(peerId1.toString(), peer1.path().toString()). - put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()). - withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString()); - leader.tell(ElectionTimeout.INSTANCE, leader); + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + leaderId.toString()); - ShardTestKit.waitUntilLeader(leader); + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); // Send PeerDown and PeerUp with no entities @@ -429,50 +466,58 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Add candidates for entity1 with the local leader as the owner - leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); // Add candidates for entity2 with peerMember2 as the owner - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); // Add candidates for entity3 with peerMember2 as the owner. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); - leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); + leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); // Add only candidate peerMember2 for entity4. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); // Add only candidate peerMember1 for entity5. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1); + peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME); // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new // owner selected @@ -487,75 +532,114 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + // no other candidates for entity4 so peerMember2 should remain owner. + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); - verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); - // Reinstate peerMember2 - no owners should change + // Reinstate peerMember2 - peer2 = actorFactory.createTestActor(newShardProps(peerId2, - ImmutableMap.builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(), - peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString()); + peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); // Send PeerUp again - should be noop leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); + // peerMember2's candidates should be removed on startup. + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // Add back candidate peerMember2 for entities 1, 2, & 3. - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit); - commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected + kit.watch(peer1); peer1.tell(PoisonPill.getInstance(), ActorRef.noSender()); + kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); + kit.unwatch(peer1); leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); // Verify the reinstated peerMember2 is fully synced. - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); // Reinstate peerMember1 and verify no owner changes - peer1 = actorFactory.createTestActor(newShardProps(peerId1, - ImmutableMap.builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(), - peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString()); + peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder( + peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); - verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); + + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME); + + verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME); + verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME); // Verify the reinstated peerMember1 is fully synced. - verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, ""); - verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); - verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); + verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, ""); + + AtomicLong leaderLastApplied = new AtomicLong(); + verifyRaftState(leader, rs -> { + assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex()); + leaderLastApplied.set(rs.getLastApplied()); + }); + + verifyRaftState(peer2, rs -> { + assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()); + }); // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for // the entities (1 and 3) previously owned by the local leader member. @@ -564,103 +648,440 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + kit.watch(leader); leader.tell(PoisonPill.getInstance(), ActorRef.noSender()); + kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); + kit.unwatch(leader); peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); - peer2.tell(ElectionTimeout.INSTANCE, peer2); + peer2.tell(TimeoutNow.INSTANCE, peer2); - ShardTestKit.waitUntilLeader(peer2); + verifyRaftState(peer2, state -> + assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME); + verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME); verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, ""); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2); - verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); + + testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending"); } @Test - public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); + public void testLeaderIsolation() throws Exception { + testLog.info("testLeaderIsolation starting"); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000); - ShardIdentifier leaderId = newShardId("leader"); - ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME); + final ShardTestKit kit = new ShardTestKit(getSystem()); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); - TestActorRef shard = actorFactory.createTestActor(Props.create( - TestEntityOwnershipShard.class, localId, - ImmutableMap.builder().put(leaderId.toString(), "".toString()).build(), - dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId())); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); - TestActorRef leader = actorFactory.createTestActor(newShardProps(leaderId, - ImmutableMap.builder().put(localId.toString(), shard.path().toString()).build(), - LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString()); - leader.tell(ElectionTimeout.INSTANCE, leader); + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .shardIsolatedLeaderCheckIntervalInMillis(500); + + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)), + leaderId.toString()); ShardTestKit.waitUntilLeader(leader); - shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender()); + // Add entity1 candidates for all members with the leader as the owner - Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1); - EntityOwnershipListener listener = mock(EntityOwnershipListener.class); + DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); - shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef()); kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); - // Register local candidate + peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME); - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + // Add entity2 candidates for all members with peer1 as the owner + + DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); - verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true)); - reset(listener); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); - // Simulate a replicated commit from the leader to remove the local candidate that would occur after a - // network partition is healed. + peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME); - leader.tell(new PeerDown(localId.getMemberName(), localId.toString()), ActorRef.noSender()); + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); - verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false)); + verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); - // Since the the shard has a local candidate registered, it should re-add its candidate to the entity. + // Add entity3 candidates for all members with peer2 as the owner - verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); - verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true)); + DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3); + peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); - // Unregister the local candidate and verify it's removed and no re-added. + leader.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME); - shard.tell(new UnregisterCandidateLocal(entity), kit.getRef()); + peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME); + + verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + + // Add listeners on all members + + DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class); + leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, false, true))); + reset(leaderListener); + + DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class); + peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)), + ownershipChange(entity3, false, false, true))); + reset(peer1Listener); + + DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class); + peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, true, true))); + reset(peer2Listener); + + // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. + + leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, + ae -> ae.getLeaderId().equals(leaderId.toString())); + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + // Make peer1 start an election and become leader by enabling the ElectionTimeout message. + + peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class); + + // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the + // isolated peers. + + leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); + + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState())); + + // Expect inJeopardy notification on the isolated leader. + + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)), + ownershipChange(entity3, false, false, true, true))); + reset(leaderListener); + + verifyRaftState(peer1, state -> + assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); + + // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the + // isolated leader. + + peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); + + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true)); + reset(peer1Listener); + + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + reset(peer2Listener); + + // Remove the isolation. + + leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + + // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities. + + verifyRaftState(leader, state -> + assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState())); + + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, false, true))); + + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); + verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true)); + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME); + + verifyNoMoreInteractions(leaderListener); + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + testLog.info("testLeaderIsolation ending"); + } + + @Test + public void testLeaderIsolationWithPendingCandidateAdded() throws Exception { + testLog.info("testLeaderIsolationWithPendingCandidateAdded starting"); + + final ShardTestKit kit = new ShardTestKit(getSystem()); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); + + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .shardIsolatedLeaderCheckIntervalInMillis(500); + + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); + + // Add listeners on all members + + DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME); + leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME); + peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME); + peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Drop the CandidateAdded message to the leader for now. + + leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class); + + // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be + // assigned the owner. + + DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + + // Capture the CandidateAdded messages. + + final List candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), + CandidateAdded.class, 2); + + // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we + // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted. + + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + + // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries. + + leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class); + leader.tell(candidateAdded.get(0), leader); + leader.tell(candidateAdded.get(1), leader); + + expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, + ae -> ae.getEntries().size() > 0); + + // Verify no owner assigned. + + verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier()); + + // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. + + leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, + ae -> ae.getLeaderId().equals(leaderId.toString())); + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + // Send PeerDown to the isolated leader - should be no-op since there's no owned entities. + + leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); + + // Verify the leader transitions to IsolatedLeader. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), + state.getRaftState())); + + // Send PeerDown to the new leader peer1. + + peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // Make peer1 start an election and become leader by sending the TimeoutNow message. + + peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + + // Verify the peer1 transitions to Leader. + + verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), + state.getRaftState())); + + verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier()); + + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + // Add candidate peer1 candidate for entity2. + + peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true)); + + reset(leaderListener, peer1Listener, peer2Listener); + + // Remove the isolation. + + leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + + // Previous leader should switch to Follower. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), + state.getRaftState())); + + // Send PeerUp to peer1 and peer2. + + peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // The previous leader should become the owner of entity1. + + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + // The previous leader's DOMEntityOwnershipListener should get 4 total notifications: + // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false) + // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false) + verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or( + or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)), + or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)))); + + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + + // Verify entity2's owner doesn't change. - verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME); + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + + verifyNoMoreInteractions(leaderListener); + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + testLog.info("testLeaderIsolationWithPendingCandidateAdded ending"); } @Test public void testListenerRegistration() throws Exception { + testLog.info("testListenerRegistration starting"); + ShardTestKit kit = new ShardTestKit(getSystem()); - TestActorRef shard = actorFactory.createTestActor(newShardProps()); - ShardTestKit.waitUntilLeader(shard); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); + + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); String otherEntityType = "otherEntityType"; - Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1); - Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2); - Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3); - Entity entity4 = new Entity(otherEntityType, ENTITY_ID3); - EntityOwnershipListener listener = mock(EntityOwnershipListener.class); + final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3); + final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3); + DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class); // Register listener - shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Register a couple candidates for the desired entity type and verify listener is notified. - shard.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true)); - shard.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true)); @@ -668,7 +1089,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Register another candidate for another entity type and verify listener is not notified. - shard.tell(new RegisterCandidateLocal(entity4), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity4), kit.getRef()); kit.expectMsgClass(SuccessReply.class); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); @@ -676,14 +1097,13 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Register remote candidate for entity1 - String remoteMemberName = "remoteMember"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName), - shardDataTree); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName); + peer.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME); // Unregister the local candidate for entity1 and verify listener is notified - shard.tell(new UnregisterCandidateLocal(entity1), kit.getRef()); + leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true)); @@ -691,21 +1111,21 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Unregister the listener, add a candidate for entity3 and verify listener isn't notified - shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - shard.tell(new RegisterCandidateLocal(entity3), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity3), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME); + verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - verify(listener, never()).ownershipChanged(any(EntityOwnershipChange.class)); + verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class)); // Re-register the listener and verify it gets notified of currently owned entities reset(listener); - shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true), @@ -713,289 +1133,180 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); verify(listener, never()).ownershipChanged(ownershipChange(entity4)); verify(listener, times(1)).ownershipChanged(ownershipChange(entity1)); - } - - private static void commitModification(TestActorRef shard, NormalizedNode node, - JavaTestKit sender) { - BatchedModifications modifications = newBatchedModifications(); - modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node)); - - shard.tell(modifications, sender.getRef()); - sender.expectMsgClass(CommitTransactionReply.class); - } - private static BatchedModifications newBatchedModifications() { - BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, ""); - modifications.setDoCommitOnReady(true); - modifications.setReady(true); - modifications.setTotalMessagesSent(1); - return modifications; + testLog.info("testListenerRegistration ending"); } - private void verifyEntityCandidateRemoved(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String candidateName) { - verifyNodeRemoved(candidatePath(entityType, entityId, candidateName), - new Function>() { - @Override - public NormalizedNode apply(YangInstanceIdentifier path) { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - throw new AssertionError("Failed to read " + path, e); - } - } - }); - } + @Test + public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception { + testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting"); - private void verifyCommittedEntityCandidate(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String candidateName) { - verifyEntityCandidate(entityType, entityId, candidateName, new Function>() { - @Override - public NormalizedNode apply(YangInstanceIdentifier path) { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - throw new AssertionError("Failed to read " + path, e); - } - } - }); - } + ShardTestKit kit = new ShardTestKit(getSystem()); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder() + .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); - private void verifyNoEntityCandidate(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String candidateName) { - verifyEntityCandidate(entityType, entityId, candidateName, new Function>() { - @Override - public NormalizedNode apply(YangInstanceIdentifier path) { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - throw new AssertionError("Failed to read " + path, e); - } - } - }, false); - } + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); - private void verifyBatchedEntityCandidate(List mods, String entityType, - YangInstanceIdentifier entityId, String candidateName) throws Exception { - assertEquals("BatchedModifications size", 1, mods.size()); - verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName); - } + TestActorRef peer = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString()); + peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - private void verifyBatchedEntityCandidate(Modification mod, String entityType, - YangInstanceIdentifier entityId, String candidateName) throws Exception { - assertEquals("Modification type", MergeModification.class, mod.getClass()); - verifyEntityCandidate(((MergeModification)mod).getData(), entityType, - entityId, candidateName, true); - } + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), + leaderId.toString()); - private static void verifyOwner(final TestActorRef shard, String entityType, - YangInstanceIdentifier entityId, String localMemberName) { - verifyOwner(localMemberName, entityType, entityId, new Function>() { - @Override - public NormalizedNode apply(YangInstanceIdentifier path) { - try { - return AbstractShardTest.readStore(shard, path); - } catch(Exception e) { - return null; - } - } - }); - } + ShardTestKit.waitUntilLeader(leader); - private Props newShardProps() { - return newShardProps(Collections.emptyMap()); - } + DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); - private Props newShardProps(EntityOwnerSelectionStrategyConfig strategyConfig, Map peers) { - return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, strategyConfig); - } + // Add a remote candidate - private Props newShardProps(Map peers) { - return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()); - } + peer.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName, - EntityOwnerSelectionStrategyConfig config) { - return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers). - datastoreContext(dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT). - localMemberName(MemberName.forName(memberName)).ownerSelectionStrategyConfig(config).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()); - } + // Register local - private static ShardIdentifier newShardId(String memberName) { - return ShardIdentifier.create("entity-ownership", MemberName.forName(memberName), - "operational" + NEXT_SHARD_NUM.getAndIncrement()); - } + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); - private static class TestEntityOwnershipShard extends EntityOwnershipShard { + // Verify the local candidate becomes owner - TestEntityOwnershipShard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext) { - super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext). - schemaContext(SCHEMA_CONTEXT).localMemberName(MemberName.forName(LOCAL_MEMBER_NAME))); - } + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); - @Override - public void handleCommand(Object message) { - if(!(message instanceof ElectionTimeout)) { - super.handleCommand(message); - } - } + testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending"); } - private static class MockFollower extends UntypedActor { - volatile boolean grantVote; - volatile boolean dropAppendEntries; - private final String myId; + @Test + public void testDelayedEntityOwnerSelection() throws Exception { + testLog.info("testDelayedEntityOwnerSelection starting"); - public MockFollower(String myId) { - this(myId, true); - } + final ShardTestKit kit = new ShardTestKit(getSystem()); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder() + .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); - public MockFollower(String myId, boolean grantVote) { - this.myId = myId; - this.grantVote = grantVote; - } + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); - @Override - public void onReceive(Object message) { - if(message instanceof RequestVote) { - if(grantVote) { - getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf()); - } - } else if(message instanceof AppendEntries) { - if(!dropAppendEntries) { - AppendEntries req = (AppendEntries) message; - long lastIndex = req.getLeaderCommit(); - if (req.getEntries().size() > 0) { - for(ReplicatedLogEntry entry : req.getEntries()) { - lastIndex = entry.getIndex(); - } - } - - getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(), - DataStoreVersions.CURRENT_VERSION), getSelf()); - } - } - } - } + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), + peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - @Test - public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder - = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)), + peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - String peerId = newShardId("follower").toString(); - TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + TestActorRef leader = actorFactory.createTestActor( + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, + builder.build()), leaderId.toString()); - peer.underlyingActor().grantVote = true; - - TestActorRef shard = actorFactory.createTestActor(newShardProps(builder.build(), - ImmutableMap.of(peerId.toString(), peer.path().toString()))); - ShardTestKit.waitUntilLeader(shard); + ShardTestKit.waitUntilLeader(leader); - Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); // Add a remote candidate - String remoteMemberName1 = "follower"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + peer1.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); // Register local - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + leader.tell(new RegisterCandidateLocal(entity), kit.getRef()); kit.expectMsgClass(SuccessReply.class); // Verify the local candidate becomes owner - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - } + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME); + verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); + verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME); - @Test - public void testDelayedEntityOwnerSelection() throws Exception { - ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder - = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + testLog.info("testDelayedEntityOwnerSelection ending"); + } - String follower1Id = newShardId("follower1").toString(); - TestActorRef follower1 = actorFactory.createTestActor(Props.create(MockFollower.class, follower1Id, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), follower1Id); + private Props newLocalShardProps() { + return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.emptyMap(), LOCAL_MEMBER_NAME); + } - follower1.underlyingActor().grantVote = true; + private Props newShardProps(final ShardIdentifier shardId, final Map peers, + final String memberName) { + return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build()); + } - String follower2Id = newShardId("follower").toString(); - TestActorRef follower2 = actorFactory.createTestActor(Props.create(MockFollower.class, follower2Id, false). - withDispatcher(Dispatchers.DefaultDispatcherId()), follower2Id); + private Props newShardProps(final ShardIdentifier shardId, final Map peers, final String memberName, + final EntityOwnerSelectionStrategyConfig config) { + return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()); + } - follower2.underlyingActor().grantVote = true; + private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map peers, + final String memberName) { + return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext( + dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName( + MemberName.forName(memberName)).ownerSelectionStrategyConfig( + EntityOwnerSelectionStrategyConfig.newBuilder().build()); + } + private Map peerMap(final String... peerIds) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (String peerId: peerIds) { + builder.put(peerId, actorFactory.createTestActorPath(peerId)).build(); + } - TestActorRef shard = actorFactory.createTestActor(newShardProps(builder.build(), - ImmutableMap.of(follower1Id.toString(), follower2.path().toString(), follower2Id.toString(), follower2.path().toString()))); - ShardTestKit.waitUntilLeader(shard); + return builder.build(); + } - Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1); - ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + private static class TestEntityOwnershipShard extends EntityOwnershipShard { + private final TestActorRef collectorActor; + private final Map, Predicate> dropMessagesOfType = new ConcurrentHashMap<>(); - // Add a remote candidate + TestEntityOwnershipShard(final Builder builder, final TestActorRef collectorActor) { + super(builder); + this.collectorActor = collectorActor; + } - String remoteMemberName1 = "follower"; - writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void handleCommand(final Object message) { + Predicate drop = dropMessagesOfType.get(message.getClass()); + if (drop == null || !drop.test(message)) { + super.handleCommand(message); + } - // Register local + if (collectorActor != null) { + collectorActor.tell(message, ActorRef.noSender()); + } + } - shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); - kit.expectMsgClass(SuccessReply.class); + void startDroppingMessagesOfType(final Class msgClass) { + dropMessagesOfType.put(msgClass, msg -> true); + } - // Verify the local candidate becomes owner + void startDroppingMessagesOfType(final Class msgClass, final Predicate filter) { + dropMessagesOfType.put(msgClass, filter); + } - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); - verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - } + void stopDroppingMessagesOfType(final Class msgClass) { + dropMessagesOfType.remove(msgClass); + } - private static class MockLeader extends UntypedActor { - volatile CountDownLatch modificationsReceived = new CountDownLatch(1); - List receivedModifications = new ArrayList<>(); - volatile boolean sendReply = true; - volatile long delay; + TestActorRef collectorActor() { + return collectorActor; + } - @Override - public void onReceive(Object message) { - if(message instanceof BatchedModifications) { - if(delay > 0) { - Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); - } - - if(sendReply) { - BatchedModifications mods = (BatchedModifications) message; - synchronized (receivedModifications) { - for(int i = 0; i < mods.getModifications().size(); i++) { - receivedModifications.add(mods.getModifications().get(i)); - modificationsReceived.countDown(); - } - } - - getSender().tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION). - toSerializable(), getSelf()); - } else { - sendReply = true; - } - } + static Props props(final Builder builder) { + return props(builder, null); } - List getAndClearReceivedModifications() { - synchronized (receivedModifications) { - List ret = new ArrayList<>(receivedModifications); - receivedModifications.clear(); - return ret; - } + static Props props(final Builder builder, final TestActorRef collectorActor) { + return Props.create(TestEntityOwnershipShard.class, builder, collectorActor) + .withDispatcher(Dispatchers.DefaultDispatcherId()); } } }