From 3c13136499bf20d1fee22bed16e6a86a37af13cf Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 7 Apr 2015 23:48:14 -0400 Subject: [PATCH] Verify BatchedModifications messages sent vs received A previous patch removed the verification of the response Futures for BatchedModifications messages sent in TransactionContextImpl. This was used to verify all modifications were successfully received and processed by the transaction actor. In lieu of that, this patch adds a totalMessagesSent field to BatchedModifications and the transaction actor verifies it matches the total number of messages received and successfully processed. Change-Id: I247d3a09794142813cdca67b63d77412261a282b Signed-off-by: Tom Pantelis --- .../datastore/ShardWriteTransaction.java | 20 ++++- .../datastore/TransactionContextImpl.java | 3 + .../messages/BatchedModifications.java | 17 ++++- .../datastore/ShardTransactionTest.java | 76 ++++++++++++++++++- .../datastore/TransactionProxyTest.java | 11 ++- .../messages/BatchedModificationsTest.java | 2 + 6 files changed, 123 insertions(+), 6 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 3568911646..e53e9cb05a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -1,6 +1,6 @@ /* - * * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -41,6 +41,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ShardWriteTransaction extends ShardTransaction { private final MutableCompositeModification compositeModification = new MutableCompositeModification(); + private int totalBatchedModificationsReceived; + private Exception lastBatchedModificationsException; private final DOMStoreWriteTransaction transaction; public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, @@ -88,13 +90,29 @@ public class ShardWriteTransaction extends ShardTransaction { modification.apply(transaction); } + totalBatchedModificationsReceived++; if(batched.isReady()) { + if(lastBatchedModificationsException != null) { + throw lastBatchedModificationsException; + } + + if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) { + throw new IllegalStateException(String.format( + "The total number of batched messages received %d does not match the number sent %d", + totalBatchedModificationsReceived, batched.getTotalMessagesSent())); + } + readyTransaction(transaction, false); } else { getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); } } catch (Exception e) { + lastBatchedModificationsException = e; getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + + if(batched.isReady()) { + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index f34c5a2571..c722918c5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -45,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; + private int totalBatchedModificationsSent; protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, @@ -159,6 +161,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { } batchedModifications.setReady(ready); + batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); sent = executeOperationAsync(batchedModifications); if(ready) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java index a9ce94b033..86f96f57d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java @@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement private static final long serialVersionUID = 1L; private boolean ready; + private int totalMessagesSent; private String transactionID; private String transactionChainID; @@ -42,6 +43,14 @@ public class BatchedModifications extends MutableCompositeModification implement this.ready = ready; } + public int getTotalMessagesSent() { + return totalMessagesSent; + } + + public void setTotalMessagesSent(int totalMessagesSent) { + this.totalMessagesSent = totalMessagesSent; + } + public String getTransactionID() { return transactionID; } @@ -56,6 +65,7 @@ public class BatchedModifications extends MutableCompositeModification implement transactionID = in.readUTF(); transactionChainID = in.readUTF(); ready = in.readBoolean(); + totalMessagesSent = in.readInt(); } @Override @@ -64,6 +74,7 @@ public class BatchedModifications extends MutableCompositeModification implement out.writeUTF(transactionID); out.writeUTF(transactionChainID); out.writeBoolean(ready); + out.writeInt(totalMessagesSent); } @Override @@ -74,8 +85,10 @@ public class BatchedModifications extends MutableCompositeModification implement @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready) - .append(", modifications size=").append(getModifications().size()).append("]"); + builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=") + .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=") + .append(totalMessagesSent).append(", modifications size=").append(getModifications().size()) + .append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 9715f668e3..7894ab159b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -4,8 +4,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doThrow; import akka.actor.ActorRef; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; @@ -52,6 +54,7 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; @@ -424,16 +427,83 @@ public class ShardTransactionTest extends AbstractActorTest { withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); - batched.setReady(true); batched.addModification(new WriteModification(writePath, writeData)); transaction.tell(batched, getRef()); + BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class); + assertEquals("getNumBatched", 1, reply.getNumBatched()); + + batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + transaction.tell(batched, getRef()); expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); watcher.expectMsgClass(duration("5 seconds"), Terminated.class); }}; } + @Test(expected=TestException.class) + public void testOnReceiveBatchedModificationsFailure() throws Throwable { + new JavaTestKit(getSystem()) {{ + + DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class); + final ActorRef transaction = newTransactionActor(mockWriteTx, + "testOnReceiveBatchedModificationsFailure"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doThrow(new TestException()).when(mockWriteTx).write(path, node); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.addModification(new WriteModification(path, node)); + + transaction.tell(batched, getRef()); + expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + + batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + + @Test(expected=IllegalStateException.class) + public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable { + new JavaTestKit(getSystem()) {{ + + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + @Test public void testOnReceivePreLithiumReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ @@ -564,4 +634,8 @@ public class ShardTransactionTest extends AbstractActorTest { expectMsgClass(duration("3 seconds"), Terminated.class); }}; } + + public static class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 6cfef19491..cc9692bfd9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -476,8 +476,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), - isA(BatchedModifications.class)); + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), true, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent()); } @Test @@ -1221,6 +1226,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); + + assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent()); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java index b302f527d6..1df8e9775b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java @@ -46,6 +46,7 @@ public class BatchedModificationsTest { batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); batched.setReady(true); + batched.setTotalMessagesSent(5); BatchedModifications clone = (BatchedModifications) SerializationUtils.clone( (Serializable) batched.toSerializable()); @@ -54,6 +55,7 @@ public class BatchedModificationsTest { assertEquals("getTransactionID", "tx1", clone.getTransactionID()); assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID()); assertEquals("isReady", true, clone.isReady()); + assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent()); assertEquals("getModifications size", 3, clone.getModifications().size()); -- 2.36.6