Check total batched messages sent in ShardCommitCoordinator on tx ready
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index c8953be5c4562e422f9fe56f6155e806c3717540..70869ee68a55644cf7aa8c5103031a72a881d703 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
@@ -816,16 +817,18 @@ public class ShardTest extends AbstractShardTest {
     }
 
     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
-        return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
+            NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady, int messagesSent) {
+        return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
     }
 
     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
-            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
+            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady,
+            int messagesSent) {
         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
         batched.addModification(new WriteModification(path, data));
         batched.setReady(ready);
         batched.setDoCommitOnReady(doCommitOnReady);
+        batched.setTotalMessagesSent(messagesSent);
         return batched;
     }
 
@@ -858,18 +861,18 @@ public class ShardTest extends AbstractShardTest {
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -926,18 +929,18 @@ public class ShardTest extends AbstractShardTest {
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -954,6 +957,32 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test(expected=IllegalStateException.class)
+    public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+            waitUntilLeader(shard);
+
+            String transactionID = "tx1";
+            BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            shard.tell(batched, getRef());
+
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
     @SuppressWarnings("unchecked")
     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
@@ -990,7 +1019,7 @@ public class ShardTest extends AbstractShardTest {
             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
-                    containerNode, true, false), getRef());
+                    containerNode, true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.