X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardTest.java;h=9eb974170bebedc12e3e58fb47435ab30813b680;hb=e9fc7e7ed2b13d274518d6a872ab67749ef4507a;hp=c268e3eff59df6b81372822616dae5b2fa6cc63a;hpb=0fab6c716548e89938c1a8493dc25991c006aa10;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index c268e3eff5..9eb974170b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -7,9 +7,6 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; -import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; -import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; -import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.junit.Assert.assertEquals; import static org.mockito.AdditionalMatchers.or; import static org.mockito.Matchers.any; @@ -20,13 +17,15 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; + import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.dispatch.Dispatchers; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; @@ -36,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; import org.junit.After; import org.junit.Test; @@ -62,6 +62,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; @@ -126,7 +127,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception { testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); @@ -161,10 +162,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception { testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardTransactionCommitTimeoutInSeconds(1); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardTransactionCommitTimeoutInSeconds(1); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); @@ -205,10 +206,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception { testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardIsolatedLeaderCheckIntervalInMillis(50); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardIsolatedLeaderCheckIntervalInMillis(50); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); @@ -248,15 +249,15 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). - shardBatchedModificationCount(5); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2) + .shardBatchedModificationCount(5); ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME); ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME); TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME), - actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString()); - TestEntityOwnershipShard leaderShard = leader.underlyingActor(); + actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString()); + final TestEntityOwnershipShard leaderShard = leader.underlyingActor(); TestActorRef local = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString()); @@ -285,13 +286,13 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { int max = 100; List entityIds = new ArrayList<>(); - for(int i = 1; i <= max; i++) { + for (int i = 1; i <= max; i++) { YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i)); entityIds.add(id); local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef()); } - for(int i = 0; i < max; i++) { + for (int i = 0; i < max; i++) { verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); } @@ -338,7 +339,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOwnershipChanges() throws Exception { testLog.info("testOwnershipChanges starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); @@ -431,10 +432,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception { testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). - shardIsolatedLeaderCheckIntervalInMillis(100000); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); @@ -522,7 +523,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { kit.watch(peer2); peer2.tell(PoisonPill.getInstance(), ActorRef.noSender()); - kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); + kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class); kit.unwatch(peer2); leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender()); @@ -589,7 +590,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { kit.watch(peer1); peer1.tell(PoisonPill.getInstance(), ActorRef.noSender()); - kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); + kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class); kit.unwatch(peer1); leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender()); @@ -629,6 +630,16 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME); verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, ""); + AtomicLong leaderLastApplied = new AtomicLong(); + verifyRaftState(leader, rs -> { + assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex()); + leaderLastApplied.set(rs.getLastApplied()); + }); + + verifyRaftState(peer2, rs -> { + assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()); + }); + // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for // the entities (1 and 3) previously owned by the local leader member. @@ -638,7 +649,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { kit.watch(leader); leader.tell(PoisonPill.getInstance(), ActorRef.noSender()); - kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); + kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class); kit.unwatch(leader); peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender()); peer2.tell(TimeoutNow.INSTANCE, peer2); @@ -658,14 +669,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testLeaderIsolation() throws Exception { testLog.info("testLeaderIsolation starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). - shardIsolatedLeaderCheckIntervalInMillis(100000); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), @@ -677,8 +688,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peerId2.toString()); peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()). - shardIsolatedLeaderCheckIntervalInMillis(500); + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .shardIsolatedLeaderCheckIntervalInMillis(500); TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)), @@ -748,22 +759,25 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class); leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true), - ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true))); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, false, true))); reset(leaderListener); DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class); peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true), - ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true))); + verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)), + ownershipChange(entity3, false, false, true))); reset(peer1Listener); DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class); peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef()); kit.expectMsgClass(SuccessReply.class); - verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true), - ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true))); + verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, true, true))); reset(peer2Listener); // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers. @@ -772,7 +786,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, - ae -> ae.getLeaderId().equals(leaderId.toString())); + ae -> ae.getLeaderId().equals(leaderId.toString())); peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); // Make peer1 start an election and become leader by enabling the ElectionTimeout message. @@ -790,8 +804,9 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Expect inJeopardy notification on the isolated leader. - verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true), - ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true))); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)), + ownershipChange(entity3, false, false, true, true))); reset(leaderListener); verifyRaftState(peer1, state -> @@ -822,8 +837,9 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState())); - verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true), - ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true))); + verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or( + ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)), + ownershipChange(entity3, false, false, true))); verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME); verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true)); @@ -843,18 +859,18 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testLeaderIsolationWithPendingCandidateAdded() throws Exception { testLog.info("testLeaderIsolationWithPendingCandidateAdded starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); + final ShardTestKit kit = new ShardTestKit(getSystem()); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME); ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4). - shardIsolatedLeaderCheckIntervalInMillis(100000); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4) + .shardIsolatedLeaderCheckIntervalInMillis(100000); TestActorRef peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME), - actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString()); + actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString()); peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); TestActorRef peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props( @@ -862,8 +878,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString()); peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); - dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()). - shardIsolatedLeaderCheckIntervalInMillis(500); + dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .shardIsolatedLeaderCheckIntervalInMillis(500); TestActorRef leader = actorFactory.createTestActor(TestEntityOwnershipShard.props( newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME), @@ -911,7 +927,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Capture the CandidateAdded messages. - List candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2); + final List candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), + CandidateAdded.class, 2); // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted. @@ -925,7 +942,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { leader.tell(candidateAdded.get(0), leader); leader.tell(candidateAdded.get(1), leader); - expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0); + expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, + ae -> ae.getEntries().size() > 0); // Verify no owner assigned. @@ -938,7 +956,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, - ae -> ae.getLeaderId().equals(leaderId.toString())); + ae -> ae.getLeaderId().equals(leaderId.toString())); peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class); // Send PeerDown to the isolated leader - should be no-op since there's no owned entities. @@ -948,7 +966,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Verify the leader transitions to IsolatedLeader. - verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState())); + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), + state.getRaftState())); // Send PeerDown to the new leader peer1. @@ -960,7 +979,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Verify the peer1 transitions to Leader. - verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState())); + verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), + state.getRaftState())); verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier()); verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier()); @@ -987,7 +1007,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Previous leader should switch to Follower. - verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState())); + verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), + state.getRaftState())); // Send PeerUp to peer1 and peer2. @@ -1003,9 +1024,9 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false) // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false) // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false) - verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false), - ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true), - ownershipChange(entity2, false, false, true)))); + verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or( + or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)), + or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)))); verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true)); @@ -1041,10 +1062,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { ShardTestKit.waitUntilLeader(leader); String otherEntityType = "otherEntityType"; - DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); - DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); - DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3); - DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3); + final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); + final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); + final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3); + final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3); DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class); // Register listener @@ -1120,8 +1141,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting"); ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder(). - addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder() + .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME); ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME); @@ -1131,7 +1152,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); TestActorRef leader = actorFactory.createTestActor( - newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString()); + newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), + leaderId.toString()); ShardTestKit.waitUntilLeader(leader); @@ -1160,9 +1182,9 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testDelayedEntityOwnerSelection() throws Exception { testLog.info("testDelayedEntityOwnerSelection starting"); - ShardTestKit kit = new ShardTestKit(getSystem()); - EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder(). - addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); + final ShardTestKit kit = new ShardTestKit(getSystem()); + EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder() + .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500); dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); @@ -1181,8 +1203,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class); TestActorRef leader = actorFactory.createTestActor( - newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()), - leaderId.toString()); + newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, + builder.build()), leaderId.toString()); ShardTestKit.waitUntilLeader(leader); @@ -1211,27 +1233,28 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.emptyMap(), LOCAL_MEMBER_NAME); } - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName) { + private Props newShardProps(final ShardIdentifier shardId, final Map peers, + final String memberName) { return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build()); } - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName, - EntityOwnerSelectionStrategyConfig config) { + private Props newShardProps(final ShardIdentifier shardId, final Map peers, final String memberName, + final EntityOwnerSelectionStrategyConfig config) { return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props() .withDispatcher(Dispatchers.DefaultDispatcherId()); } - private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map peers, - String memberName) { + private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map peers, + final String memberName) { return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext( - dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName( + dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName( MemberName.forName(memberName)).ownerSelectionStrategyConfig( EntityOwnerSelectionStrategyConfig.newBuilder().build()); } - private Map peerMap(String... peerIds) { + private Map peerMap(final String... peerIds) { ImmutableMap.Builder builder = ImmutableMap.builder(); - for(String peerId: peerIds) { + for (String peerId: peerIds) { builder.put(peerId, actorFactory.createTestActorPath(peerId)).build(); } @@ -1239,50 +1262,50 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { } private static class TestEntityOwnershipShard extends EntityOwnershipShard { - private final TestActorRef collectorActor; + private final ActorRef collectorActor; private final Map, Predicate> dropMessagesOfType = new ConcurrentHashMap<>(); - TestEntityOwnershipShard(Builder builder, TestActorRef collectorActor) { + TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) { super(builder); this.collectorActor = collectorActor; } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void handleCommand(Object message) { + public void handleCommand(final Object message) { Predicate drop = dropMessagesOfType.get(message.getClass()); - if(drop == null || !drop.test(message)) { + if (drop == null || !drop.test(message)) { super.handleCommand(message); } - if(collectorActor != null) { + if (collectorActor != null) { collectorActor.tell(message, ActorRef.noSender()); } } - void startDroppingMessagesOfType(Class msgClass) { + void startDroppingMessagesOfType(final Class msgClass) { dropMessagesOfType.put(msgClass, msg -> true); } - void startDroppingMessagesOfType(Class msgClass, Predicate filter) { + void startDroppingMessagesOfType(final Class msgClass, final Predicate filter) { dropMessagesOfType.put(msgClass, filter); } - void stopDroppingMessagesOfType(Class msgClass) { + void stopDroppingMessagesOfType(final Class msgClass) { dropMessagesOfType.remove(msgClass); } - TestActorRef collectorActor() { + ActorRef collectorActor() { return collectorActor; } - static Props props(Builder builder) { + static Props props(final Builder builder) { return props(builder, null); } - static Props props(Builder builder, TestActorRef collectorActor) { - return Props.create(TestEntityOwnershipShard.class, builder, collectorActor). - withDispatcher(Dispatchers.DefaultDispatcherId()); + static Props props(final Builder builder, final ActorRef collectorActor) { + return Props.create(TestEntityOwnershipShard.class, builder, collectorActor) + .withDispatcher(Dispatchers.DefaultDispatcherId()); } } }