X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransactionTest.java;h=c9335f378a8662e7b7ceb488961fb49588d50baf;hp=c6b5cb44026f42690fffc89289a50c13fe1c59aa;hb=c31a6fcf9fb070d4419ca4c32d8b531fdcb5030d;hpb=6b9787d26aab04acf95fa30601291380f9b1d5c9 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index c6b5cb4402..c9335f378a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -4,8 +4,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doThrow; import akka.actor.ActorRef; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; @@ -52,6 +54,7 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; @@ -79,7 +82,7 @@ public class ShardTransactionTest extends AbstractActorTest { private ActorRef createShard(){ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); + Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) { @@ -97,7 +100,7 @@ public class ShardTransactionTest extends AbstractActorTest { private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name, short version) { Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(), - testSchemaContext, datastoreContext, shardStats, "txn", version); + datastoreContext, shardStats, "txn", version); return getSystem().actorOf(props, name); } @@ -372,7 +375,7 @@ public class ShardTransactionTest extends AbstractActorTest { YangInstanceIdentifier deletePath = TestModel.TEST_PATH; - BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION); + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); batched.addModification(new WriteModification(writePath, writeData)); batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); @@ -409,34 +412,125 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveReadyTransaction() throws Exception { + public void testOnReceiveBatchedModificationsReady() throws Exception { + new JavaTestKit(getSystem()) {{ + + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveBatchedModificationsReady"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.addModification(new WriteModification(writePath, writeData)); + + transaction.tell(batched, getRef()); + BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class); + assertEquals("getNumBatched", 1, reply.getNumBatched()); + + batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + }}; + } + + @Test(expected=TestException.class) + public void testOnReceiveBatchedModificationsFailure() throws Throwable { + new JavaTestKit(getSystem()) {{ + + DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class); + final ActorRef transaction = newTransactionActor(mockWriteTx, + "testOnReceiveBatchedModificationsFailure"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doThrow(new TestException()).when(mockWriteTx).write(path, node); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.addModification(new WriteModification(path, node)); + + transaction.tell(batched, getRef()); + expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + + batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + + @Test(expected=IllegalStateException.class) + public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable { + new JavaTestKit(getSystem()) {{ + + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, getRef()); + + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + + @Test + public void testOnReceivePreLithiumReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), - "testReadyTransaction"); + "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION); - watch(transaction); + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); transaction.tell(new ReadyTransaction().toSerializable(), getRef()); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, - Terminated.class); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, - Terminated.class); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); }}; // test new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), - "testReadyTransaction2"); + "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION); - watch(transaction); + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); transaction.tell(new ReadyTransaction(), getRef()); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, - Terminated.class); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, - Terminated.class); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); }}; } @@ -517,8 +611,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION). @@ -540,4 +633,8 @@ public class ShardTransactionTest extends AbstractActorTest { expectMsgClass(duration("3 seconds"), Terminated.class); }}; } + + public static class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + } }