Check total batched messages sent in ShardCommitCoordinator on tx ready 34/22034/2
authorTom Pantelis <tpanteli@brocade.com>
Sat, 30 May 2015 09:06:46 +0000 (05:06 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 6 Jun 2015 23:32:23 +0000 (23:32 +0000)
Added check in the ShardCommitCoordinator to ensure the total number of
batched messages sent equals the total number of messages received as
was done in the ShardWriteTransaction.

Change-Id: I86a67aec7a6a8e8994aee9a2a167972ede1808c7
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit eac2d5f506b801fcac619ca2cb7e4dcd85489a08)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 6821fa7..7d80689 100644 (file)
@@ -184,6 +184,12 @@ class ShardCommitCoordinator {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
+            if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+                throw new IllegalStateException(String.format(
+                        "The total number of batched messages received %d does not match the number sent %d",
+                        cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
+            }
+
             if(!queueCohortEntry(cohortEntry, sender, shard)) {
                 return;
             }
@@ -493,6 +499,7 @@ class ShardCommitCoordinator {
         private Shard shard;
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
+        private int totalBatchedModificationsReceived;
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
@@ -525,10 +532,16 @@ class ShardCommitCoordinator {
             return cohort;
         }
 
+        int getTotalBatchedModificationsReceived() {
+            return totalBatchedModificationsReceived;
+        }
+
         void applyModifications(Iterable<Modification> modifications) {
             for (Modification modification : modifications) {
                 modification.apply(transaction.getSnapshot());
             }
+
+            totalBatchedModificationsReceived++;
         }
 
         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
index 237a5c6..1091aa5 100644 (file)
@@ -47,6 +47,7 @@ public final class ReadyLocalTransactionSerializer extends JSerializer {
         final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(),
                 DataStoreVersions.CURRENT_VERSION, "");
         batched.setDoCommitOnReady(msg.isDoCommitOnReady());
+        batched.setTotalMessagesSent(1);
         batched.setReady(true);
 
         msg.getModification().applyToCursor(new BatchedCursor(batched));
index c8953be..70869ee 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
@@ -816,16 +817,18 @@ public class ShardTest extends AbstractShardTest {
     }
 
     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
-        return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
+            NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady, int messagesSent) {
+        return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
     }
 
     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
-            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
+            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady,
+            int messagesSent) {
         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
         batched.addModification(new WriteModification(path, data));
         batched.setReady(ready);
         batched.setDoCommitOnReady(doCommitOnReady);
+        batched.setTotalMessagesSent(messagesSent);
         return batched;
     }
 
@@ -858,18 +861,18 @@ public class ShardTest extends AbstractShardTest {
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -926,18 +929,18 @@ public class ShardTest extends AbstractShardTest {
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -954,6 +957,32 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test(expected=IllegalStateException.class)
+    public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+            waitUntilLeader(shard);
+
+            String transactionID = "tx1";
+            BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            shard.tell(batched, getRef());
+
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
     @SuppressWarnings("unchecked")
     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
@@ -990,7 +1019,7 @@ public class ShardTest extends AbstractShardTest {
             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
-                    containerNode, true, false), getRef());
+                    containerNode, true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.