Bug 3206: CDS - issues with direct commit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index ee543164de51e41e711b424da8a29a271c1d5382..49cfcbc9c257a31a5e7ad51585ebda185d8f9fd5 100644 (file)
@@ -1826,7 +1826,7 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
-        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
+        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
@@ -1866,9 +1866,11 @@ public class ShardTest extends AbstractShardTest {
                     cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
+            // The 3rd Tx should exceed queue capacity and fail.
+
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
                     cohort3, modification3, true, false), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // canCommit 1st Tx.
 
@@ -1888,6 +1890,125 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
+        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitWithPriorExpiredCohortEntries");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
+            // should expire from the queue and the last one should be processed.
+
+            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
+        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitWithSubsequentExpiredCohortEntry");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // CanCommit the first one so it's the current in-progress CohortEntry.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Ready the second Tx.
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // Ready the third Tx.
+
+            String transactionID3 = "tx3";
+            DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+            new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
+                    .apply(modification3);
+            ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
+
+            shard.tell(readyMessage, getRef());
+
+            // Commit the first Tx. After completing, the second should expire from the queue and the third
+            // Tx committed.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Expect commit reply from the third Tx.
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
+            assertNotNull(TestModel.TEST2_PATH + " not found", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCanCommitBeforeReadyFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{