Bug 4105: Change commit retry mechanism in EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
index e4aaaa188851b4db4e19d69f822f56857497a944..d4c59cc39657ff020d20b6485b36280194758a4c 100644 (file)
@@ -18,7 +18,9 @@ import akka.testkit.TestActorRef;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -36,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -200,7 +203,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
         ShardTestKit kit = new ShardTestKit(getSystem());
 
-        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
+                shardBatchedModificationCount(5);
 
         String peerId = actorFactory.generateActorId("leader");
         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
@@ -221,9 +225,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         MockLeader leader = peer.underlyingActor();
         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
-        verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
+                LOCAL_MEMBER_NAME);
+
+        shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
 
-        leader.modificationsReceived = new CountDownLatch(2);
+        // Test with initial commit timeout and subsequent retry.
+
+        leader.modificationsReceived = new CountDownLatch(1);
         leader.sendReply = false;
 
         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
@@ -233,7 +242,34 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
-        verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
+        verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
+                LOCAL_MEMBER_NAME);
+
+        // Send a bunch of registration messages quickly and verify.
+
+        int max = 100;
+        leader.delay = 4;
+        leader.modificationsReceived = new CountDownLatch(max);
+        List<YangInstanceIdentifier> entityIds = new ArrayList<>();
+        for(int i = 1; i <= max; i++) {
+            YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
+            entityIds.add(id);
+            shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
+        }
+
+        assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+                leader.modificationsReceived, 10, TimeUnit.SECONDS));
+
+        // Sleep a little to ensure no additional BatchedModifications are received.
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+        List<Modification> receivedMods = leader.getAndClearReceivedModifications();
+        for(int i = 0; i < max; i++) {
+            verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
+        }
+
+        assertEquals("# modifications received", max, receivedMods.size());
     }
 
     private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
@@ -241,11 +277,16 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
     }
 
-    private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
+    private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
-        assertEquals("BatchedModifications size", 1, mods.getModifications().size());
-        assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
-        verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
+        assertEquals("BatchedModifications size", 1, mods.size());
+        verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
+    }
+
+    private void verifyBatchedEntityCandidate(Modification mod, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) throws Exception {
+        assertEquals("Modification type", MergeModification.class, mod.getClass());
+        verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
                 entityId, candidateName);
     }
 
@@ -306,20 +347,39 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
     public static class MockLeader extends UntypedActor {
         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
-        volatile BatchedModifications receivedModifications;
+        List<Modification> receivedModifications = new ArrayList<>();
         volatile boolean sendReply = true;
+        volatile long delay;
 
         @Override
         public void onReceive(Object message) {
             if(message instanceof BatchedModifications) {
-                receivedModifications = (BatchedModifications) message;
-                modificationsReceived.countDown();
+                if(delay > 0) {
+                    Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
+                }
+
                 if(sendReply) {
+                    BatchedModifications mods = (BatchedModifications) message;
+                    synchronized (receivedModifications) {
+                        for(int i = 0; i < mods.getModifications().size(); i++) {
+                            receivedModifications.add(mods.getModifications().get(i));
+                            modificationsReceived.countDown();
+                        }
+                    }
+
                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
                 } else {
                     sendReply = true;
                 }
             }
         }
+
+        List<Modification> getAndClearReceivedModifications() {
+            synchronized (receivedModifications) {
+                List<Modification> ret = new ArrayList<>(receivedModifications);
+                receivedModifications.clear();
+                return ret;
+            }
+        }
     }
 }