Verify BatchedModifications messages sent vs received 91/17891/5
authorTom Pantelis <tpanteli@brocade.com>
Wed, 8 Apr 2015 03:48:14 +0000 (23:48 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 10 Apr 2015 14:13:03 +0000 (10:13 -0400)
A previous patch removed the verification of the response
Futures for BatchedModifications messages sent in
TransactionContextImpl. This was used to verify all modifications
were successfully received and processed by the transaction actor.

In lieu of that, this patch adds a totalMessagesSent field to
BatchedModifications and the transaction actor verifies it matches
the total number of messages received and successfully processed.

Change-Id: I247d3a09794142813cdca67b63d77412261a282b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java

index 3568911..e53e9cb 100644 (file)
@@ -1,6 +1,6 @@
 /*
- *
  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *  Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  *  This program and the accompanying materials are made available under the
  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -41,6 +41,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class ShardWriteTransaction extends ShardTransaction {
 
     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
+    private int totalBatchedModificationsReceived;
+    private Exception lastBatchedModificationsException;
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@@ -88,13 +90,29 @@ public class ShardWriteTransaction extends ShardTransaction {
                 modification.apply(transaction);
             }
 
+            totalBatchedModificationsReceived++;
             if(batched.isReady()) {
+                if(lastBatchedModificationsException != null) {
+                    throw lastBatchedModificationsException;
+                }
+
+                if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+                    throw new IllegalStateException(String.format(
+                            "The total number of batched messages received %d does not match the number sent %d",
+                            totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
+                }
+
                 readyTransaction(transaction, false);
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
         } catch (Exception e) {
+            lastBatchedModificationsException = e;
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+            if(batched.isReady()) {
+                getSelf().tell(PoisonPill.getInstance(), getSelf());
+            }
         }
     }
 
index f34c5a2..c722918 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -45,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
+    private int totalBatchedModificationsSent;
 
     protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
             String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
@@ -159,6 +161,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             }
 
             batchedModifications.setReady(ready);
+            batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
             sent = executeOperationAsync(batchedModifications);
 
             if(ready) {
index a9ce94b..86f96f5 100644 (file)
@@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement
     private static final long serialVersionUID = 1L;
 
     private boolean ready;
+    private int totalMessagesSent;
     private String transactionID;
     private String transactionChainID;
 
@@ -42,6 +43,14 @@ public class BatchedModifications extends MutableCompositeModification implement
         this.ready = ready;
     }
 
+    public int getTotalMessagesSent() {
+        return totalMessagesSent;
+    }
+
+    public void setTotalMessagesSent(int totalMessagesSent) {
+        this.totalMessagesSent = totalMessagesSent;
+    }
+
     public String getTransactionID() {
         return transactionID;
     }
@@ -56,6 +65,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         transactionID = in.readUTF();
         transactionChainID = in.readUTF();
         ready = in.readBoolean();
+        totalMessagesSent = in.readInt();
     }
 
     @Override
@@ -64,6 +74,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         out.writeUTF(transactionID);
         out.writeUTF(transactionChainID);
         out.writeBoolean(ready);
+        out.writeInt(totalMessagesSent);
     }
 
     @Override
@@ -74,8 +85,10 @@ public class BatchedModifications extends MutableCompositeModification implement
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
-                .append(", modifications size=").append(getModifications().size()).append("]");
+        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=")
+                .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=")
+                .append(totalMessagesSent).append(", modifications size=").append(getModifications().size())
+                .append("]");
         return builder.toString();
     }
 }
index 9715f66..7894ab1 100644 (file)
@@ -4,8 +4,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
@@ -52,6 +54,7 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -424,16 +427,83 @@ public class ShardTransactionTest extends AbstractActorTest {
                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
-            batched.setReady(true);
             batched.addModification(new WriteModification(writePath, writeData));
 
             transaction.tell(batched, getRef());
+            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+            assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
 
+            transaction.tell(batched, getRef());
             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
     }
 
+    @Test(expected=TestException.class)
+    public void testOnReceiveBatchedModificationsFailure() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+            final ActorRef transaction = newTransactionActor(mockWriteTx,
+                    "testOnReceiveBatchedModificationsFailure");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+            doThrow(new TestException()).when(mockWriteTx).write(path, node);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.addModification(new WriteModification(path, node));
+
+            transaction.tell(batched, getRef());
+            expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
     @Test
     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -564,4 +634,8 @@ public class ShardTransactionTest extends AbstractActorTest {
             expectMsgClass(duration("3 seconds"), Terminated.class);
         }};
     }
+
+    public static class TestException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+    }
 }
index 6cfef19..cc9692b 100644 (file)
@@ -476,8 +476,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
-        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(BatchedModifications.class));
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), true,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+        assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
     }
 
     @Test
@@ -1221,6 +1226,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
+
+        assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
     }
 
     @Test
index b302f52..1df8e97 100644 (file)
@@ -46,6 +46,7 @@ public class BatchedModificationsTest {
         batched.addModification(new MergeModification(mergePath, mergeData));
         batched.addModification(new DeleteModification(deletePath));
         batched.setReady(true);
+        batched.setTotalMessagesSent(5);
 
         BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
                 (Serializable) batched.toSerializable());
@@ -54,6 +55,7 @@ public class BatchedModificationsTest {
         assertEquals("getTransactionID", "tx1", clone.getTransactionID());
         assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
         assertEquals("isReady", true, clone.isReady());
+        assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
 
         assertEquals("getModifications size", 3, clone.getModifications().size());
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.