X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardTest.java;h=a6bf30c01ea3f9dfeee1cbf1b8570ce801684682;hb=refs%2Fchanges%2F98%2F26798%2F1;hp=e4aaaa188851b4db4e19d69f822f56857497a944;hpb=a8f617e6dd21f9a453e9ece1e57f4549899584e7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index e4aaaa1888..a6bf30c01e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -15,10 +15,13 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.dispatch.Dispatchers; import akka.testkit.TestActorRef; +import com.google.common.base.Function; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -36,6 +39,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -92,6 +96,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { kit.expectMsgClass(SuccessReply.class); verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); } @Test @@ -119,6 +125,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { peer.underlyingActor().grantVote = true; verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); } @Test @@ -157,7 +165,10 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Resume AppendEntries - the follower should ack the commit which should then result in the candidate // write being applied to the state. follower.dropAppendEntries = false; + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); } @Test @@ -194,13 +205,16 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { // Resume AppendEntries - the candidate write should now be committed. follower.dropAppendEntries = false; verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + + verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); } @Test public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception { ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100). + shardBatchedModificationCount(5); String peerId = actorFactory.generateActorId("leader"); TestActorRef peer = actorFactory.createTestActor(Props.create(MockLeader.class). @@ -221,9 +235,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { MockLeader leader = peer.underlyingActor(); assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1, + LOCAL_MEMBER_NAME); + + shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender()); - leader.modificationsReceived = new CountDownLatch(2); + // Test with initial commit timeout and subsequent retry. + + leader.modificationsReceived = new CountDownLatch(1); leader.sendReply = false; shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender()); @@ -233,7 +252,34 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME); + verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2, + LOCAL_MEMBER_NAME); + + // Send a bunch of registration messages quickly and verify. + + int max = 100; + leader.delay = 4; + leader.modificationsReceived = new CountDownLatch(max); + List entityIds = new ArrayList<>(); + for(int i = 1; i <= max; i++) { + YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i)); + entityIds.add(id); + shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef()); + } + + assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( + leader.modificationsReceived, 10, TimeUnit.SECONDS)); + + // Sleep a little to ensure no additional BatchedModifications are received. + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + + List receivedMods = leader.getAndClearReceivedModifications(); + for(int i = 0; i < max; i++) { + verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); + } + + assertEquals("# modifications received", max, receivedMods.size()); } private void verifyCommittedEntityCandidate(TestActorRef shard, String entityType, @@ -241,11 +287,16 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName); } - private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType, + private void verifyBatchedEntityCandidate(List mods, String entityType, + YangInstanceIdentifier entityId, String candidateName) throws Exception { + assertEquals("BatchedModifications size", 1, mods.size()); + verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName); + } + + private void verifyBatchedEntityCandidate(Modification mod, String entityType, YangInstanceIdentifier entityId, String candidateName) throws Exception { - assertEquals("BatchedModifications size", 1, mods.getModifications().size()); - assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass()); - verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType, + assertEquals("Modification type", MergeModification.class, mod.getClass()); + verifyEntityCandidate(((MergeModification)mod).getData(), entityType, entityId, candidateName); } @@ -263,6 +314,20 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { return null; } + private void verifyOwner(final TestActorRef shard, String entityType, YangInstanceIdentifier entityId, + String localMemberName) { + verifyOwner(localMemberName, entityType, entityId, new Function>() { + @Override + public NormalizedNode apply(YangInstanceIdentifier path) { + try { + return AbstractShardTest.readStore(shard, path); + } catch(Exception e) { + return null; + } + } + }); + } + private Props newShardProps() { return newShardProps(Collections.emptyMap()); } @@ -306,20 +371,39 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public static class MockLeader extends UntypedActor { volatile CountDownLatch modificationsReceived = new CountDownLatch(1); - volatile BatchedModifications receivedModifications; + List receivedModifications = new ArrayList<>(); volatile boolean sendReply = true; + volatile long delay; @Override public void onReceive(Object message) { if(message instanceof BatchedModifications) { - receivedModifications = (BatchedModifications) message; - modificationsReceived.countDown(); + if(delay > 0) { + Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); + } + if(sendReply) { + BatchedModifications mods = (BatchedModifications) message; + synchronized (receivedModifications) { + for(int i = 0; i < mods.getModifications().size(); i++) { + receivedModifications.add(mods.getModifications().get(i)); + modificationsReceived.countDown(); + } + } + getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { sendReply = true; } } } + + List getAndClearReceivedModifications() { + synchronized (receivedModifications) { + List ret = new ArrayList<>(receivedModifications); + receivedModifications.clear(); + return ret; + } + } } }