Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
index 84bc87238a70a9745d510eda4f6c17c8b488d66d..5da78530a1d31745c222827a797aa066ef9fa9ef 100644 (file)
@@ -5,28 +5,30 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Throwables;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -47,10 +49,10 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 
 public class ShardTransactionTest extends AbstractActorTest {
 
@@ -60,7 +62,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
-    private static final SchemaContext TEST_MODEL = TestModel.createTestContext();
+    private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
 
     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
 
@@ -68,6 +70,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private TestActorRef<Shard> shard;
     private ShardDataTree store;
+    private TestKit testKit;
 
     @Before
     public void setUp() {
@@ -76,6 +79,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
         ShardTestKit.waitUntilLeader(shard);
         store = shard.underlyingActor().getDataStore();
+        testKit = new TestKit(getSystem());
     }
 
     private ActorRef newTransactionActor(final TransactionType type,
@@ -94,315 +98,258 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveReadData() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
-
-                testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
-            }
+    public void testOnReceiveReadData() {
+        testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
+        testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
+    }
 
-            private void testOnReceiveReadData(final ActorRef transaction) {
-                transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
-                        getRef());
+    private void testOnReceiveReadData(final ActorRef transaction) {
+        transaction.tell(new ReadData(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
+            testKit.getRef());
 
-                ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+        ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
 
-                assertNotNull(reply.getNormalizedNode());
-            }
-        };
+        assertNotNull(reply.getNormalizedNode());
     }
 
     @Test
-    public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                testOnReceiveReadDataWhenDataNotFound(
-                        newTransactionActor(RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
-
-                testOnReceiveReadDataWhenDataNotFound(
-                        newTransactionActor(RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
-            }
+    public void testOnReceiveReadDataWhenDataNotFound() {
+        testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
+            "testReadDataWhenDataNotFoundRO"));
+        testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
+            "testReadDataWhenDataNotFoundRW"));
+    }
 
-            private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
-                transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
+    private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
+        transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
 
-                ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+        ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
 
-                assertTrue(reply.getNormalizedNode() == null);
-            }
-        };
+        assertNull(reply.getNormalizedNode());
     }
 
     @Test
-    public void testOnReceiveDataExistsPositive() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                testOnReceiveDataExistsPositive(
-                        newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
-
-                testOnReceiveDataExistsPositive(
-                        newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
-            }
+    public void testOnReceiveDataExistsPositive() {
+        testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
+        testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
+    }
 
-            private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
-                transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
-                        getRef());
+    private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+        transaction.tell(new DataExists(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
+            testKit.getRef());
 
-                DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
+        DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
 
-                assertTrue(reply.exists());
-            }
-        };
+        assertTrue(reply.exists());
     }
 
     @Test
-    public void testOnReceiveDataExistsNegative() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                testOnReceiveDataExistsNegative(
-                        newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
-
-                testOnReceiveDataExistsNegative(
-                        newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
-            }
+    public void testOnReceiveDataExistsNegative() {
+        testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
+        testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
+    }
 
-            private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
-                transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
+    private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+        transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
 
-                DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
+        DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
 
-                assertFalse(reply.exists());
-            }
-        };
+        assertFalse(reply.exists());
     }
 
     @Test
-    public void testOnReceiveBatchedModifications() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
-                DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
-                ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
-                        nextTransactionId(), mockModification);
-                final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
-
-                YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-                NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
-                        .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
-                YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
-                NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
-                        .build();
-
-                YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
-
-                BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-                        DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(writePath, writeData));
-                batched.addModification(new MergeModification(mergePath, mergeData));
-                batched.addModification(new DeleteModification(deletePath));
-
-                transaction.tell(batched, getRef());
-
-                BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
-                        BatchedModificationsReply.class);
-                assertEquals("getNumBatched", 3, reply.getNumBatched());
-
-                InOrder inOrder = Mockito.inOrder(mockModification);
-                inOrder.verify(mockModification).write(writePath, writeData);
-                inOrder.verify(mockModification).merge(mergePath, mergeData);
-                inOrder.verify(mockModification).delete(deletePath);
-            }
-        };
+    public void testOnReceiveBatchedModifications() {
+        ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
+        DataTreeModification mockModification = mock(DataTreeModification.class);
+        ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+            nextTransactionId(), mockModification);
+        final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+        NormalizedNode mergeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
+                .build();
+
+        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+            DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.addModification(new MergeModification(mergePath, mergeData));
+        batched.addModification(new DeleteModification(deletePath));
+
+        transaction.tell(batched, testKit.getRef());
+
+        BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
+            BatchedModificationsReply.class);
+        assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+        InOrder inOrder = inOrder(mockModification);
+        inOrder.verify(mockModification).write(writePath, writeData);
+        inOrder.verify(mockModification).merge(mergePath, mergeData);
+        inOrder.verify(mockModification).delete(deletePath);
     }
 
     @Test
-    public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
-
-                JavaTestKit watcher = new JavaTestKit(getSystem());
-                watcher.watch(transaction);
-
-                YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-                NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
-                        .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
-                final TransactionIdentifier tx1 = nextTransactionId();
-                BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(writePath, writeData));
-
-                transaction.tell(batched, getRef());
-                BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
-                        BatchedModificationsReply.class);
-                assertEquals("getNumBatched", 1, reply.getNumBatched());
-
-                batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
-                batched.setTotalMessagesSent(2);
-
-                transaction.tell(batched, getRef());
-                expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
-            }
-        };
+    public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
+
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        final TransactionIdentifier tx1 = nextTransactionId();
+        BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+
+        transaction.tell(batched, testKit.getRef());
+        BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
+            BatchedModificationsReply.class);
+        assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+        batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.setReady();
+        batched.setTotalMessagesSent(2);
+
+        transaction.tell(batched, testKit.getRef());
+        testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
+        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
     }
 
     @Test
-    public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
-
-                JavaTestKit watcher = new JavaTestKit(getSystem());
-                watcher.watch(transaction);
-
-                YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-                NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
-                        .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
-                BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-                        DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(writePath, writeData));
-                batched.setReady(true);
-                batched.setDoCommitOnReady(true);
-                batched.setTotalMessagesSent(1);
-
-                transaction.tell(batched, getRef());
-                expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
-            }
-        };
+    public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
+
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+            DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.setReady();
+        batched.setDoCommitOnReady(true);
+        batched.setTotalMessagesSent(1);
+
+        transaction.tell(batched, testKit.getRef());
+        testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
+        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
     }
 
     @Test(expected = TestException.class)
     public void testOnReceiveBatchedModificationsFailure() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
+        ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
+        DataTreeModification mockModification = mock(DataTreeModification.class);
+        ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+            nextTransactionId(), mockModification);
+        final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
+            "testOnReceiveBatchedModificationsFailure");
 
-                ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
-                DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
-                ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
-                        nextTransactionId(), mockModification);
-                final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
-                        "testOnReceiveBatchedModificationsFailure");
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
 
-                JavaTestKit watcher = new JavaTestKit(getSystem());
-                watcher.watch(transaction);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                YangInstanceIdentifier path = TestModel.TEST_PATH;
-                ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        doThrow(new TestException()).when(mockModification).write(path, node);
 
-                doThrow(new TestException()).when(mockModification).write(path, node);
+        final TransactionIdentifier tx1 = nextTransactionId();
+        BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(path, node));
 
-                final TransactionIdentifier tx1 = nextTransactionId();
-                BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(path, node));
+        transaction.tell(batched, testKit.getRef());
+        testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
 
-                transaction.tell(batched, getRef());
-                expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+        batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.setReady();
+        batched.setTotalMessagesSent(2);
 
-                batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
-                batched.setTotalMessagesSent(2);
+        transaction.tell(batched, testKit.getRef());
+        Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
+        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
 
-                transaction.tell(batched, getRef());
-                Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
-
-                if (failure != null) {
-                    Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
-                    Throwables.propagate(failure.cause());
-                }
-            }
-        };
+        if (failure != null) {
+            Throwables.propagateIfPossible(failure.cause(), Exception.class);
+            throw new RuntimeException(failure.cause());
+        }
     }
 
     @Test(expected = IllegalStateException.class)
     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
 
-                JavaTestKit watcher = new JavaTestKit(getSystem());
-                watcher.watch(transaction);
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
 
-                BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-                        DataStoreVersions.CURRENT_VERSION);
-                batched.setReady(true);
-                batched.setTotalMessagesSent(2);
+        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+            DataStoreVersions.CURRENT_VERSION);
+        batched.setReady();
+        batched.setTotalMessagesSent(2);
 
-                transaction.tell(batched, getRef());
+        transaction.tell(batched, testKit.getRef());
 
-                Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+        Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
+        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
 
-                if (failure != null) {
-                    Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
-                    Throwables.propagate(failure.cause());
-                }
-            }
-        };
+        if (failure != null) {
+            Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
+            Throwables.throwIfUnchecked(failure.cause());
+            throw new RuntimeException(failure.cause());
+        }
     }
 
     @Test
-    public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
-                        "testReadWriteTxOnReceiveCloseTransaction");
+    public void testReadWriteTxOnReceiveCloseTransaction() {
+        final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
+                "testReadWriteTxOnReceiveCloseTransaction");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                transaction.tell(new CloseTransaction().toSerializable(), getRef());
+        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
 
-                expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
-                expectTerminated(duration("3 seconds"), transaction);
-            }
-        };
+        testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
+        testKit.expectTerminated(Duration.ofSeconds(3), transaction);
     }
 
     @Test
-    public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testWriteTxOnReceiveCloseTransaction");
+    public void testWriteOnlyTxOnReceiveCloseTransaction() {
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testWriteTxOnReceiveCloseTransaction");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                transaction.tell(new CloseTransaction().toSerializable(), getRef());
+        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
 
-                expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
-                expectTerminated(duration("3 seconds"), transaction);
-            }
-        };
+        testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
+        testKit.expectTerminated(Duration.ofSeconds(3), transaction);
     }
 
     @Test
-    public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
-                        "testReadOnlyTxOnReceiveCloseTransaction");
+    public void testReadOnlyTxOnReceiveCloseTransaction() {
+        final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
+                "testReadOnlyTxOnReceiveCloseTransaction");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                transaction.tell(new CloseTransaction().toSerializable(), getRef());
+        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
 
-                expectMsgClass(duration("3 seconds"), Terminated.class);
-            }
-        };
+        testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
     }
 
     @Test
@@ -410,16 +357,12 @@ public class ShardTransactionTest extends AbstractActorTest {
         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
                 500, TimeUnit.MILLISECONDS).build();
 
-        new JavaTestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
-                        "testShardTransactionInactivity");
+        final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
+            "testShardTransactionInactivity");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                expectMsgClass(duration("3 seconds"), Terminated.class);
-            }
-        };
+        testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
     }
 
     public static class TestException extends RuntimeException {