Cleanup TestKit use
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
index d1f47852c621303d556de3e50a8d133a97470da1..8f952acdd5a9e6a53c568e33ddd71db8fd044983 100644 (file)
@@ -5,14 +5,16 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
@@ -26,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -68,6 +69,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private TestActorRef<Shard> shard;
     private ShardDataTree store;
+    private TestKit testKit;
 
     @Before
     public void setUp() {
@@ -76,6 +78,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
         ShardTestKit.waitUntilLeader(shard);
         store = shard.underlyingActor().getDataStore();
+        testKit = new TestKit(getSystem());
     }
 
     private ActorRef newTransactionActor(final TransactionType type,
@@ -95,315 +98,257 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     @Test
     public void testOnReceiveReadData() {
-        new TestKit(getSystem()) {
-            {
-                testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
-
-                testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
-            }
+        testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
+        testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
+    }
 
-            private void testOnReceiveReadData(final ActorRef transaction) {
-                transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
-                        getRef());
+    private void testOnReceiveReadData(final ActorRef transaction) {
+        transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
+            testKit.getRef());
 
-                ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+        ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class);
 
-                assertNotNull(reply.getNormalizedNode());
-            }
-        };
+        assertNotNull(reply.getNormalizedNode());
     }
 
     @Test
     public void testOnReceiveReadDataWhenDataNotFound() {
-        new TestKit(getSystem()) {
-            {
-                testOnReceiveReadDataWhenDataNotFound(
-                        newTransactionActor(RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
-
-                testOnReceiveReadDataWhenDataNotFound(
-                        newTransactionActor(RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
-            }
+        testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
+            "testReadDataWhenDataNotFoundRO"));
+        testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
+            "testReadDataWhenDataNotFoundRW"));
+    }
 
-            private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
-                transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
+    private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
+        transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
 
-                ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+        ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class);
 
-                assertTrue(reply.getNormalizedNode() == null);
-            }
-        };
+        assertNull(reply.getNormalizedNode());
     }
 
     @Test
     public void testOnReceiveDataExistsPositive() {
-        new TestKit(getSystem()) {
-            {
-                testOnReceiveDataExistsPositive(
-                        newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
-
-                testOnReceiveDataExistsPositive(
-                        newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
-            }
+        testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
+        testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
+    }
 
-            private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
-                transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
-                        getRef());
+    private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+        transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
+            testKit.getRef());
 
-                DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
+        DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class);
 
-                assertTrue(reply.exists());
-            }
-        };
+        assertTrue(reply.exists());
     }
 
     @Test
     public void testOnReceiveDataExistsNegative() {
-        new TestKit(getSystem()) {
-            {
-                testOnReceiveDataExistsNegative(
-                        newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
-
-                testOnReceiveDataExistsNegative(
-                        newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
-            }
+        testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
+        testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
+    }
 
-            private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
-                transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
+    private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+        transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
 
-                DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
+        DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class);
 
-                assertFalse(reply.exists());
-            }
-        };
+        assertFalse(reply.exists());
     }
 
     @Test
     public void testOnReceiveBatchedModifications() {
-        new TestKit(getSystem()) {
-            {
-                ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
-                DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
-                ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
-                        nextTransactionId(), mockModification);
-                final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
-
-                YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-                NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
-                        .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
-                YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
-                NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
-                        .build();
-
-                YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
-
-                BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-                        DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(writePath, writeData));
-                batched.addModification(new MergeModification(mergePath, mergeData));
-                batched.addModification(new DeleteModification(deletePath));
-
-                transaction.tell(batched, getRef());
-
-                BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
-                        BatchedModificationsReply.class);
-                assertEquals("getNumBatched", 3, reply.getNumBatched());
-
-                InOrder inOrder = Mockito.inOrder(mockModification);
-                inOrder.verify(mockModification).write(writePath, writeData);
-                inOrder.verify(mockModification).merge(mergePath, mergeData);
-                inOrder.verify(mockModification).delete(deletePath);
-            }
-        };
+        ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
+        DataTreeModification mockModification = mock(DataTreeModification.class);
+        ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+            nextTransactionId(), mockModification);
+        final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
+                .build();
+
+        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+            DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.addModification(new MergeModification(mergePath, mergeData));
+        batched.addModification(new DeleteModification(deletePath));
+
+        transaction.tell(batched, testKit.getRef());
+
+        BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
+            BatchedModificationsReply.class);
+        assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+        InOrder inOrder = inOrder(mockModification);
+        inOrder.verify(mockModification).write(writePath, writeData);
+        inOrder.verify(mockModification).merge(mergePath, mergeData);
+        inOrder.verify(mockModification).delete(deletePath);
     }
 
     @Test
     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
-        new TestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
-
-                TestKit watcher = new TestKit(getSystem());
-                watcher.watch(transaction);
-
-                YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-                NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
-                        .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
-                final TransactionIdentifier tx1 = nextTransactionId();
-                BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(writePath, writeData));
-
-                transaction.tell(batched, getRef());
-                BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
-                        BatchedModificationsReply.class);
-                assertEquals("getNumBatched", 1, reply.getNumBatched());
-
-                batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady();
-                batched.setTotalMessagesSent(2);
-
-                transaction.tell(batched, getRef());
-                expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
-            }
-        };
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
+
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        final TransactionIdentifier tx1 = nextTransactionId();
+        BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+
+        transaction.tell(batched, testKit.getRef());
+        BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
+            BatchedModificationsReply.class);
+        assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+        batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.setReady();
+        batched.setTotalMessagesSent(2);
+
+        transaction.tell(batched, testKit.getRef());
+        testKit.expectMsgClass(testKit.duration("5 seconds"), ReadyTransactionReply.class);
+        watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class);
     }
 
     @Test
     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
-        new TestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
-
-                TestKit watcher = new TestKit(getSystem());
-                watcher.watch(transaction);
-
-                YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-                NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
-                        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
-                        .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
-                BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-                        DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(writePath, writeData));
-                batched.setReady();
-                batched.setDoCommitOnReady(true);
-                batched.setTotalMessagesSent(1);
-
-                transaction.tell(batched, getRef());
-                expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
-            }
-        };
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
+
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+            DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.setReady();
+        batched.setDoCommitOnReady(true);
+        batched.setTotalMessagesSent(1);
+
+        transaction.tell(batched, testKit.getRef());
+        testKit.expectMsgClass(testKit.duration("5 seconds"), CommitTransactionReply.class);
+        watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class);
     }
 
     @Test(expected = TestException.class)
     public void testOnReceiveBatchedModificationsFailure() throws Exception {
-        new TestKit(getSystem()) {
-            {
+        ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
+        DataTreeModification mockModification = mock(DataTreeModification.class);
+        ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+            nextTransactionId(), mockModification);
+        final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
+            "testOnReceiveBatchedModificationsFailure");
 
-                ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
-                DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
-                ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
-                        nextTransactionId(), mockModification);
-                final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
-                        "testOnReceiveBatchedModificationsFailure");
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
 
-                TestKit watcher = new TestKit(getSystem());
-                watcher.watch(transaction);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                YangInstanceIdentifier path = TestModel.TEST_PATH;
-                ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        doThrow(new TestException()).when(mockModification).write(path, node);
 
-                doThrow(new TestException()).when(mockModification).write(path, node);
+        final TransactionIdentifier tx1 = nextTransactionId();
+        BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(path, node));
 
-                final TransactionIdentifier tx1 = nextTransactionId();
-                BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.addModification(new WriteModification(path, node));
+        transaction.tell(batched, testKit.getRef());
+        testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
 
-                transaction.tell(batched, getRef());
-                expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+        batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
+        batched.setReady();
+        batched.setTotalMessagesSent(2);
 
-                batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-                batched.setReady();
-                batched.setTotalMessagesSent(2);
+        transaction.tell(batched, testKit.getRef());
+        Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
+        watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class);
 
-                transaction.tell(batched, getRef());
-                Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
-
-                if (failure != null) {
-                    Throwables.propagateIfPossible(failure.cause(), Exception.class);
-                    throw new RuntimeException(failure.cause());
-                }
-            }
-        };
+        if (failure != null) {
+            Throwables.propagateIfPossible(failure.cause(), Exception.class);
+            throw new RuntimeException(failure.cause());
+        }
     }
 
     @Test(expected = IllegalStateException.class)
     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
-        new TestKit(getSystem()) {
-            {
-
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
 
-                TestKit watcher = new TestKit(getSystem());
-                watcher.watch(transaction);
+        TestKit watcher = new TestKit(getSystem());
+        watcher.watch(transaction);
 
-                BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-                        DataStoreVersions.CURRENT_VERSION);
-                batched.setReady();
-                batched.setTotalMessagesSent(2);
+        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
+            DataStoreVersions.CURRENT_VERSION);
+        batched.setReady();
+        batched.setTotalMessagesSent(2);
 
-                transaction.tell(batched, getRef());
+        transaction.tell(batched, testKit.getRef());
 
-                Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
-                watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+        Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
+        watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class);
 
-                if (failure != null) {
-                    Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
-                    Throwables.throwIfUnchecked(failure.cause());
-                    throw new RuntimeException(failure.cause());
-                }
-            }
-        };
+        if (failure != null) {
+            Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
+            Throwables.throwIfUnchecked(failure.cause());
+            throw new RuntimeException(failure.cause());
+        }
     }
 
     @Test
     public void testReadWriteTxOnReceiveCloseTransaction() {
-        new TestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
-                        "testReadWriteTxOnReceiveCloseTransaction");
+        final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
+                "testReadWriteTxOnReceiveCloseTransaction");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                transaction.tell(new CloseTransaction().toSerializable(), getRef());
+        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
 
-                expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
-                expectTerminated(duration("3 seconds"), transaction);
-            }
-        };
+        testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class);
+        testKit.expectTerminated(testKit.duration("3 seconds"), transaction);
     }
 
     @Test
     public void testWriteOnlyTxOnReceiveCloseTransaction() {
-        new TestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                        "testWriteTxOnReceiveCloseTransaction");
+        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
+                "testWriteTxOnReceiveCloseTransaction");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                transaction.tell(new CloseTransaction().toSerializable(), getRef());
+        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
 
-                expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
-                expectTerminated(duration("3 seconds"), transaction);
-            }
-        };
+        testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class);
+        testKit.expectTerminated(testKit.duration("3 seconds"), transaction);
     }
 
     @Test
     public void testReadOnlyTxOnReceiveCloseTransaction() {
-        new TestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
-                        "testReadOnlyTxOnReceiveCloseTransaction");
+        final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
+                "testReadOnlyTxOnReceiveCloseTransaction");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                transaction.tell(new CloseTransaction().toSerializable(), getRef());
+        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
 
-                expectMsgClass(duration("3 seconds"), Terminated.class);
-            }
-        };
+        testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class);
     }
 
     @Test
@@ -411,16 +356,12 @@ public class ShardTransactionTest extends AbstractActorTest {
         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
                 500, TimeUnit.MILLISECONDS).build();
 
-        new TestKit(getSystem()) {
-            {
-                final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
-                        "testShardTransactionInactivity");
+        final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
+            "testShardTransactionInactivity");
 
-                watch(transaction);
+        testKit.watch(transaction);
 
-                expectMsgClass(duration("3 seconds"), Terminated.class);
-            }
-        };
+        testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class);
     }
 
     public static class TestException extends RuntimeException {