Merge "Verify BatchedModifications messages sent vs received"
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 15:20:04 +0000 (15:20 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 15:20:06 +0000 (15:20 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java

index 356891164628bdfd8434b620707452ffa89bf7d5..e53e9cb05aaaa01a3800203c7bce0d4bce204630 100644 (file)
@@ -1,6 +1,6 @@
 /*
- *
  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *  Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  *  This program and the accompanying materials are made available under the
  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -41,6 +41,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class ShardWriteTransaction extends ShardTransaction {
 
     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
+    private int totalBatchedModificationsReceived;
+    private Exception lastBatchedModificationsException;
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@@ -88,13 +90,29 @@ public class ShardWriteTransaction extends ShardTransaction {
                 modification.apply(transaction);
             }
 
+            totalBatchedModificationsReceived++;
             if(batched.isReady()) {
+                if(lastBatchedModificationsException != null) {
+                    throw lastBatchedModificationsException;
+                }
+
+                if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+                    throw new IllegalStateException(String.format(
+                            "The total number of batched messages received %d does not match the number sent %d",
+                            totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
+                }
+
                 readyTransaction(transaction, false);
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
         } catch (Exception e) {
+            lastBatchedModificationsException = e;
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+            if(batched.isReady()) {
+                getSelf().tell(PoisonPill.getInstance(), getSelf());
+            }
         }
     }
 
index f34c5a257125026f61f02fcbf18c945160ede2bc..c722918c5cfed8ad7062e63911ae60fa45aad7fa 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -45,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
+    private int totalBatchedModificationsSent;
 
     protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
             String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
@@ -159,6 +161,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             }
 
             batchedModifications.setReady(ready);
+            batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
             sent = executeOperationAsync(batchedModifications);
 
             if(ready) {
index a9ce94b033b26690b7e49017c36a2e24abbd0f3c..86f96f57d0f3cb0c284a1a902bd820baaac9e82d 100644 (file)
@@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement
     private static final long serialVersionUID = 1L;
 
     private boolean ready;
+    private int totalMessagesSent;
     private String transactionID;
     private String transactionChainID;
 
@@ -42,6 +43,14 @@ public class BatchedModifications extends MutableCompositeModification implement
         this.ready = ready;
     }
 
+    public int getTotalMessagesSent() {
+        return totalMessagesSent;
+    }
+
+    public void setTotalMessagesSent(int totalMessagesSent) {
+        this.totalMessagesSent = totalMessagesSent;
+    }
+
     public String getTransactionID() {
         return transactionID;
     }
@@ -56,6 +65,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         transactionID = in.readUTF();
         transactionChainID = in.readUTF();
         ready = in.readBoolean();
+        totalMessagesSent = in.readInt();
     }
 
     @Override
@@ -64,6 +74,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         out.writeUTF(transactionID);
         out.writeUTF(transactionChainID);
         out.writeBoolean(ready);
+        out.writeInt(totalMessagesSent);
     }
 
     @Override
@@ -74,8 +85,10 @@ public class BatchedModifications extends MutableCompositeModification implement
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
-                .append(", modifications size=").append(getModifications().size()).append("]");
+        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=")
+                .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=")
+                .append(totalMessagesSent).append(", modifications size=").append(getModifications().size())
+                .append("]");
         return builder.toString();
     }
 }
index 9715f668e353fe71d527865d2ffd68a4759952b5..7894ab159bf3e539c803324edd47a911a4ef223c 100644 (file)
@@ -4,8 +4,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
@@ -52,6 +54,7 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -424,16 +427,83 @@ public class ShardTransactionTest extends AbstractActorTest {
                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
-            batched.setReady(true);
             batched.addModification(new WriteModification(writePath, writeData));
 
             transaction.tell(batched, getRef());
+            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+            assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
 
+            transaction.tell(batched, getRef());
             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
     }
 
+    @Test(expected=TestException.class)
+    public void testOnReceiveBatchedModificationsFailure() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+            final ActorRef transaction = newTransactionActor(mockWriteTx,
+                    "testOnReceiveBatchedModificationsFailure");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+            doThrow(new TestException()).when(mockWriteTx).write(path, node);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.addModification(new WriteModification(path, node));
+
+            transaction.tell(batched, getRef());
+            expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
     @Test
     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -564,4 +634,8 @@ public class ShardTransactionTest extends AbstractActorTest {
             expectMsgClass(duration("3 seconds"), Terminated.class);
         }};
     }
+
+    public static class TestException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+    }
 }
index 6cfef194915bb81230b6b8b87c0809afcd15eb2e..cc9692bfd91b72f3245f2bb6bd160f12551720b5 100644 (file)
@@ -476,8 +476,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
-        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(BatchedModifications.class));
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), true,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+        assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
     }
 
     @Test
@@ -1221,6 +1226,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
+
+        assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
     }
 
     @Test
index b302f527d6eb7019b68df59bac8bed656edb6127..1df8e9775b89219439f31cbafbec69e675a353d8 100644 (file)
@@ -46,6 +46,7 @@ public class BatchedModificationsTest {
         batched.addModification(new MergeModification(mergePath, mergeData));
         batched.addModification(new DeleteModification(deletePath));
         batched.setReady(true);
+        batched.setTotalMessagesSent(5);
 
         BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
                 (Serializable) batched.toSerializable());
@@ -54,6 +55,7 @@ public class BatchedModificationsTest {
         assertEquals("getTransactionID", "tx1", clone.getTransactionID());
         assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
         assertEquals("isReady", true, clone.isReady());
+        assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
 
         assertEquals("getModifications size", 3, clone.getModifications().size());