X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardTest.java;h=cfef02c509a106cc1e4ea6f25d4c058440f8f80b;hp=9660a2a39289890a74361f97b3d44a5cb9e2794d;hb=7426d405093265655b05c6a3eb197362266edf2e;hpb=07c96b0fa318b7bf559df4954f705d06a44f1354 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 9660a2a392..cfef02c509 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -17,6 +17,10 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; + import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -39,6 +43,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.ShardTestKit; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; @@ -122,7 +127,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception { testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); @@ -157,10 +162,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception { testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardTransactionCommitTimeoutInSeconds(1); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardTransactionCommitTimeoutInSeconds(1); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); @@ -201,10 +206,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception { testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardIsolatedLeaderCheckIntervalInMillis(50); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardIsolatedLeaderCheckIntervalInMillis(50); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); @@ -244,15 +249,15 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardBatchedModificationCount(5); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardBatchedModificationCount(5); ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME); ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME); TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME), actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); - TestEntityOwnershipShard leaderShard = leader.underlyingActor(); + final TestEntityOwnershipShard leaderShard = leader.underlyingActor(); TestActorRef local = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString()); @@ -272,22 +277,22 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); + expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class); // Send a bunch of registration messages quickly and verify. leaderShard.stopDroppingMessagesOfType(BatchedModifications.class); - MessageCollectorActor.clearMessages(leaderShard.collectorActor()); + clearMessages(leaderShard.collectorActor()); int max = 100; List entityIds = new ArrayList<>(); - for(int i = 1; i <= max; i++) { + for (int i = 1; i <= max; i++) { YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i)); entityIds.add(id); local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef()); } - for(int i = 0; i < max; i++) { + for (int i = 0; i < max; i++) { verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); } @@ -334,7 +339,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOwnershipChanges() throws Exception { testLog.info("testOwnershipChanges starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); @@ -427,10 +432,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception { testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). - shardIsolatedLeaderCheckIntervalInMillis(100000); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); @@ -654,14 +659,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testLeaderIsolation() throws Exception { testLog.info("testLeaderIsolation starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). - shardIsolatedLeaderCheckIntervalInMillis(100000); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), @@ -673,8 +678,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peerId2.toString()); peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()). - shardIsolatedLeaderCheckIntervalInMillis(500); + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .shardIsolatedLeaderCheckIntervalInMillis(500); TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)), @@ -744,22 +749,25 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class); leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true), - ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true))); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, false, true))); reset(leaderListener); DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class); peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true), - ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true))); + verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)), + ownershipChange(entity3, false, false, true))); reset(peer1Listener); DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class); peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true), - ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true))); + verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, true, true))); reset(peer2Listener); // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. @@ -768,7 +776,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, - ae -> ae.getLeaderId().equals(leaderId.toString())); + ae -> ae.getLeaderId().equals(leaderId.toString())); peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); // Make peer1 start an election and become leader by enabling the ElectionTimeout message. @@ -786,8 +794,9 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Expect inJeopardy notification on the isolated leader. - verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true), - ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true))); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)), + ownershipChange(entity3, false, false, true, true))); reset(leaderListener); verifyRaftState(peer1, state -> @@ -818,8 +827,9 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState())); - verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true), - ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true))); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, false, true))); verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true)); @@ -835,6 +845,194 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { testLog.info("testLeaderIsolation ending"); } + @Test + public void testLeaderIsolationWithPendingCandidateAdded() throws Exception { + testLog.info("testLeaderIsolationWithPendingCandidateAdded starting"); + + final ShardTestKit kit = new ShardTestKit(getSystem()); + + ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); + ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); + ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); + + TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString()); + peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString()); + peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); + + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .shardIsolatedLeaderCheckIntervalInMillis(500); + + TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( + newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), + actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); + + ShardTestKit.waitUntilLeader(leader); + + // Add listeners on all members + + DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME); + leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME); + peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class, + "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME); + peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Drop the CandidateAdded message to the leader for now. + + leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class); + + // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be + // assigned the owner. + + DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + leader.tell(new RegisterCandidateLocal(entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + leader.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME); + + // Capture the CandidateAdded messages. + + final List candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), + CandidateAdded.class, 2); + + // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we + // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted. + + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0); + + // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries. + + leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class); + leader.tell(candidateAdded.get(0), leader); + leader.tell(candidateAdded.get(1), leader); + + expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, + ae -> ae.getEntries().size() > 0); + + // Verify no owner assigned. + + verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier()); + + // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. + + leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, + ae -> ae.getLeaderId().equals(leaderId.toString())); + peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); + + // Send PeerDown to the isolated leader - should be no-op since there's no owned entities. + + leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); + leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); + + // Verify the leader transitions to IsolatedLeader. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), + state.getRaftState())); + + // Send PeerDown to the new leader peer1. + + peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // Make peer1 start an election and become leader by sending the TimeoutNow message. + + peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + + // Verify the peer1 transitions to Leader. + + verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), + state.getRaftState())); + + verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier()); + verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier()); + + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + // Add candidate peer1 candidate for entity2. + + peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef()); + + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true)); + + reset(leaderListener, peer1Listener, peer2Listener); + + // Remove the isolation. + + leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class); + leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class); + + // Previous leader should switch to Follower. + + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), + state.getRaftState())); + + // Send PeerUp to peer1 and peer2. + + peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); + + // The previous leader should become the owner of entity1. + + verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME); + + // The previous leader's DOMEntityOwnershipListener should get 4 total notifications: + // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) + // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false) + // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false) + verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or( + or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)), + or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)))); + + verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); + + // Verify entity2's owner doesn't change. + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME); + + verifyNoMoreInteractions(leaderListener); + verifyNoMoreInteractions(peer1Listener); + verifyNoMoreInteractions(peer2Listener); + + testLog.info("testLeaderIsolationWithPendingCandidateAdded ending"); + } + @Test public void testListenerRegistration() throws Exception { testLog.info("testListenerRegistration starting"); @@ -854,10 +1052,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { ShardTestKit.waitUntilLeader(leader); String otherEntityType = "otherEntityType"; - DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); - DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); - DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3); - DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3); + final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3); + final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3); DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class); // Register listener @@ -933,8 +1131,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting"); ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder(). - addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder() + .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); @@ -944,7 +1142,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); TestActorRef leader = actorFactory.createTestActor( - newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString()); + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), + leaderId.toString()); ShardTestKit.waitUntilLeader(leader); @@ -973,9 +1172,9 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testDelayedEntityOwnerSelection() throws Exception { testLog.info("testDelayedEntityOwnerSelection starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder(). - addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + final ShardTestKit kit = new ShardTestKit(getSystem()); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder() + .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); @@ -994,8 +1193,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); TestActorRef leader = actorFactory.createTestActor( - newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()), - leaderId.toString()); + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, + builder.build()), leaderId.toString()); ShardTestKit.waitUntilLeader(leader); @@ -1024,27 +1223,28 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.emptyMap(), LOCAL_MEMBER_NAME); } - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName) { + private Props newShardProps(final ShardIdentifier shardId, final Map peers, + final String memberName) { return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build()); } - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName, - EntityOwnerSelectionStrategyConfig config) { + private Props newShardProps(final ShardIdentifier shardId, final Map peers, final String memberName, + final EntityOwnerSelectionStrategyConfig config) { return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props() .withDispatcher(Dispatchers.DefaultDispatcherId()); } - private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map peers, - String memberName) { + private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map peers, + final String memberName) { return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext( - dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName( + dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName( MemberName.forName(memberName)).ownerSelectionStrategyConfig( EntityOwnerSelectionStrategyConfig.newBuilder().build()); } - private Map peerMap(String... peerIds) { + private Map peerMap(final String... peerIds) { ImmutableMap.Builder builder = ImmutableMap.builder(); - for(String peerId: peerIds) { + for (String peerId: peerIds) { builder.put(peerId, actorFactory.createTestActorPath(peerId)).build(); } @@ -1055,33 +1255,33 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { private final TestActorRef collectorActor; private final Map, Predicate> dropMessagesOfType = new ConcurrentHashMap<>(); - TestEntityOwnershipShard(Builder builder, TestActorRef collectorActor) { + TestEntityOwnershipShard(final Builder builder, final TestActorRef collectorActor) { super(builder); this.collectorActor = collectorActor; } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void handleCommand(Object message) { - if(collectorActor != null) { - collectorActor.tell(message, ActorRef.noSender()); - } - + public void handleCommand(final Object message) { Predicate drop = dropMessagesOfType.get(message.getClass()); - if(drop == null || !drop.test(message)) { + if (drop == null || !drop.test(message)) { super.handleCommand(message); } + + if (collectorActor != null) { + collectorActor.tell(message, ActorRef.noSender()); + } } - void startDroppingMessagesOfType(Class msgClass) { + void startDroppingMessagesOfType(final Class msgClass) { dropMessagesOfType.put(msgClass, msg -> true); } - void startDroppingMessagesOfType(Class msgClass, Predicate filter) { + void startDroppingMessagesOfType(final Class msgClass, final Predicate filter) { dropMessagesOfType.put(msgClass, filter); } - void stopDroppingMessagesOfType(Class msgClass) { + void stopDroppingMessagesOfType(final Class msgClass) { dropMessagesOfType.remove(msgClass); } @@ -1089,13 +1289,13 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { return collectorActor; } - static Props props(Builder builder) { + static Props props(final Builder builder) { return props(builder, null); } - static Props props(Builder builder, TestActorRef collectorActor) { - return Props.create(TestEntityOwnershipShard.class, builder, collectorActor). - withDispatcher(Dispatchers.DefaultDispatcherId()); + static Props props(final Builder builder, final TestActorRef collectorActor) { + return Props.create(TestEntityOwnershipShard.class, builder, collectorActor) + .withDispatcher(Dispatchers.DefaultDispatcherId()); } } }