From: Tom Pantelis Date: Sat, 30 May 2015 09:06:46 +0000 (-0400) Subject: Check total batched messages sent in ShardCommitCoordinator on tx ready X-Git-Tag: release/beryllium~513 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=376df7be9839100e31e6916d6e685dda9f8bd030 Check total batched messages sent in ShardCommitCoordinator on tx ready Added check in the ShardCommitCoordinator to ensure the total number of batched messages sent equals the total number of messages received as was done in the ShardWriteTransaction. Change-Id: I86a67aec7a6a8e8994aee9a2a167972ede1808c7 Signed-off-by: Tom Pantelis (cherry picked from commit eac2d5f506b801fcac619ca2cb7e4dcd85489a08) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 6821fa721f..7d80689044 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -184,6 +184,12 @@ class ShardCommitCoordinator { cohortEntry.applyModifications(batched.getModifications()); if(batched.isReady()) { + if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { + throw new IllegalStateException(String.format( + "The total number of batched messages received %d does not match the number sent %d", + cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent())); + } + if(!queueCohortEntry(cohortEntry, sender, shard)) { return; } @@ -493,6 +499,7 @@ class ShardCommitCoordinator { private Shard shard; private boolean doImmediateCommit; private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); + private int totalBatchedModificationsReceived; CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { this.transaction = Preconditions.checkNotNull(transaction); @@ -525,10 +532,16 @@ class ShardCommitCoordinator { return cohort; } + int getTotalBatchedModificationsReceived() { + return totalBatchedModificationsReceived; + } + void applyModifications(Iterable modifications) { for (Modification modification : modifications) { modification.apply(transaction.getSnapshot()); } + + totalBatchedModificationsReceived++; } void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java index 237a5c6227..1091aa5070 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java @@ -47,6 +47,7 @@ public final class ReadyLocalTransactionSerializer extends JSerializer { final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(), DataStoreVersions.CURRENT_VERSION, ""); batched.setDoCommitOnReady(msg.isDoCommitOnReady()); + batched.setTotalMessagesSent(1); batched.setReady(true); msg.getModification().applyToCursor(new BatchedCursor(batched)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index c8953be5c4..70869ee68a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -15,6 +15,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; @@ -816,16 +817,18 @@ public class ShardTest extends AbstractShardTest { } private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, - NormalizedNode data, boolean ready, boolean doCommitOnReady) { - return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); + NormalizedNode data, boolean ready, boolean doCommitOnReady, int messagesSent) { + return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent); } private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, - YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { + YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady, + int messagesSent) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); batched.setReady(ready); batched.setDoCommitOnReady(doCommitOnReady); + batched.setTotalMessagesSent(messagesSent); return batched; } @@ -858,18 +861,18 @@ public class ShardTest extends AbstractShardTest { // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); // Send a couple more BatchedModifications. shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef()); + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -926,18 +929,18 @@ public class ShardTest extends AbstractShardTest { // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); // Send a couple more BatchedModifications. shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef()); + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); @@ -954,6 +957,32 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test(expected=IllegalStateException.class) + public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsReadyWithIncorrectTotalMessageCount"); + + waitUntilLeader(shard); + + String transactionID = "tx1"; + BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + shard.tell(batched, getRef()); + + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + @SuppressWarnings("unchecked") private static void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); @@ -990,7 +1019,7 @@ public class ShardTest extends AbstractShardTest { ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, - containerNode, true, false), getRef()); + containerNode, true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Create a read Tx on the same chain.