Merge "Bug 2265: Modified NormalizedNodeOutputStreamWriter to implement yangtools...
authorMoiz Raja <moraja@cisco.com>
Wed, 29 Oct 2014 09:00:27 +0000 (09:00 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 29 Oct 2014 09:00:27 +0000 (09:00 +0000)
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

index 5497f93c4371a2c044bdaf05266e40656b4ff50e..04d9a43c2d40c16c3e0058f2be5bf5256df70498 100644 (file)
@@ -12,13 +12,26 @@ package org.opendaylight.controller.cluster.common.actor;
  */
 public abstract class AbstractUntypedActorWithMetering extends AbstractUntypedActor {
 
+    //this is used in the metric name. Some transient actors do not have defined names
+    private String actorNameOverride;
+
     public AbstractUntypedActorWithMetering() {
         if (isMetricsCaptureEnabled())
             getContext().become(new MeteringBehavior(this));
     }
 
+    public AbstractUntypedActorWithMetering(String actorNameOverride){
+        this.actorNameOverride = actorNameOverride;
+        if (isMetricsCaptureEnabled())
+            getContext().become(new MeteringBehavior(this));
+    }
+
     private boolean isMetricsCaptureEnabled(){
         CommonConfig config = new CommonConfig(getContext().system().settings().config());
         return config.isMetricCaptureEnabled();
     }
+
+    public String getActorNameOverride() {
+        return actorNameOverride;
+    }
 }
index d67d413d0963fff464503391002a3e6d3af15bee..9ff185a61e27d61c335223d8b1150fa3738a41c6 100644 (file)
@@ -32,19 +32,35 @@ public class MeteringBehavior implements Procedure<Object> {
     private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry();
     private final String MSG_PROCESSING_RATE = "msg-rate";
 
-    private String actorName;
+    private String actorQualifiedName;
     private Timer msgProcessingTimer;
 
     /**
      *
      * @param actor whose behaviour needs to be metered
      */
-    public MeteringBehavior(UntypedActor actor){
+    public MeteringBehavior(AbstractUntypedActorWithMetering actor){
         Preconditions.checkArgument(actor != null, "actor must not be null");
+        this.meteredActor = actor;
 
+        String actorName = actor.getActorNameOverride() != null ? actor.getActorNameOverride()
+                                                                : actor.getSelf().path().name();
+        init(actorName);
+    }
+
+    public MeteringBehavior(UntypedActor actor){
+        Preconditions.checkArgument(actor != null, "actor must not be null");
         this.meteredActor = actor;
-        actorName = meteredActor.getSelf().path().toStringWithoutAddress();
-        final String msgProcessingTime = MetricRegistry.name(actorName, MSG_PROCESSING_RATE);
+
+        String actorName = actor.getSelf().path().name();
+        init(actorName);
+    }
+
+    private void init(String actorName){
+        actorQualifiedName = new StringBuilder(meteredActor.getSelf().path().parent().toStringWithoutAddress()).
+                append("/").append(actorName).toString();
+
+        final String msgProcessingTime = MetricRegistry.name(actorQualifiedName, MSG_PROCESSING_RATE);
         msgProcessingTimer = METRICREGISTRY.timer(msgProcessingTime);
     }
 
@@ -69,7 +85,7 @@ public class MeteringBehavior implements Procedure<Object> {
         final String messageType = message.getClass().getSimpleName();
 
         final String msgProcessingTimeByMsgType =
-                MetricRegistry.name(actorName, MSG_PROCESSING_RATE, messageType);
+                MetricRegistry.name(actorQualifiedName, MSG_PROCESSING_RATE, messageType);
 
         final Timer msgProcessingTimerByMsgType = METRICREGISTRY.timer(msgProcessingTimeByMsgType);
 
index 5289ad33bfb2b2bc3a5008c0211ffccc3b077721..32de47f451d9ff53f301339975357440e651bea5 100644 (file)
@@ -15,7 +15,7 @@ import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -55,7 +55,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
  * </p>
  */
-public abstract class ShardTransaction extends AbstractUntypedActor {
+public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
 
     private final ActorRef shardActor;
     private final SchemaContext schemaContext;
@@ -65,6 +65,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
             ShardStats shardStats, String transactionID) {
+        super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
index 21c210daf252fc4633b12882bb18dfea99779aa5..b0eaf98d59c9ccd2d1eda1f6d782736db238290e 100644 (file)
@@ -11,6 +11,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
@@ -87,12 +88,12 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void writeData(DOMStoreWriteTransaction transaction, WriteData message, boolean returnSerialized) {
+    private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
+            boolean returnSerialized) {
+        LOG.debug("writeData at path : {}", message.getPath());
+
         modification.addModification(
                 new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("writeData at path : " + message.getPath().toString());
-        }
         try {
             transaction.write(message.getPath(), message.getData());
             WriteDataReply writeDataReply = new WriteDataReply();
@@ -103,12 +104,13 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, boolean returnSerialized) {
+    private void mergeData(DOMStoreWriteTransaction transaction, MergeData message,
+            boolean returnSerialized) {
+        LOG.debug("mergeData at path : {}", message.getPath());
+
         modification.addModification(
                 new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("mergeData at path : " + message.getPath().toString());
-        }
+
         try {
             transaction.merge(message.getPath(), message.getData());
             MergeDataReply mergeDataReply = new MergeDataReply();
@@ -119,10 +121,10 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, boolean returnSerialized) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("deleteData at path : " + message.getPath().toString());
-        }
+    private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message,
+            boolean returnSerialized) {
+        LOG.debug("deleteData at path : {}", message.getPath());
+
         modification.addModification(new DeleteModification(message.getPath()));
         try {
             transaction.delete(message.getPath());
@@ -134,12 +136,19 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, boolean returnSerialized) {
+    private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
+            boolean returnSerialized) {
+        String transactionID = getTransactionID();
+
+        LOG.debug("readyTransaction : {}", transactionID);
+
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
-        getShardActor().forward(new ForwardedReadyTransaction(
-            getTransactionID(), cohort, modification, returnSerialized),
-                getContext());
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
+                returnSerialized), getContext());
+
+        // The shard will handle the commit from here so we're no longer needed - self-destruct.
+        getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
     // These classes are in here for test purposes only
index 17731de5cd48918a1d03d04ec6dece68b9321e5a..6375e3c7fba51fa02c077ac49d01def5eed5d22a 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.pattern.AskTimeoutException;
 import akka.testkit.TestActorRef;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -155,7 +156,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = AskTimeoutException.class)
     public void testNegativeWriteWithTransactionReady() throws Exception {
 
 
@@ -187,7 +188,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = AskTimeoutException.class)
     public void testNegativeReadWriteWithTransactionReady() throws Exception {
 
 
@@ -224,7 +225,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = AskTimeoutException.class)
     public void testNegativeMergeTransactionReady() throws Exception {
 
 
@@ -256,7 +257,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     }
 
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = AskTimeoutException.class)
     public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
 
 
index 711f3d7a72a16b615224246e07e3adb750b7cff6..793df8e0ca9776f7d21c07fcbaf7788be8f562bd 100644 (file)
@@ -88,9 +88,9 @@ public class ShardTransactionTest extends AbstractActorTest {
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
 
-        private void testOnReceiveReadData(final ActorRef subject) {
+        private void testOnReceiveReadData(final ActorRef transaction) {
             //serialized read
-            subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                 getRef());
 
             ShardTransactionMessages.ReadDataReply replySerialized =
@@ -101,7 +101,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 .getNormalizedNode());
 
             // unserialized read
-            subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -126,9 +126,9 @@ public class ShardTransactionTest extends AbstractActorTest {
                     props, "testReadDataWhenDataNotFoundRW"));
         }
 
-        private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
+        private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
             // serialized read
-            subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
+            transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
 
             ShardTransactionMessages.ReadDataReply replySerialized =
                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
@@ -137,7 +137,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
 
             // unserialized read
-            subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
+            transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -160,8 +160,8 @@ public class ShardTransactionTest extends AbstractActorTest {
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
 
-        private void testOnReceiveDataExistsPositive(final ActorRef subject) {
-            subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
+        private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
                 getRef());
 
             ShardTransactionMessages.DataExistsReply replySerialized =
@@ -170,7 +170,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
 
             // unserialized read
-            subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -193,8 +193,8 @@ public class ShardTransactionTest extends AbstractActorTest {
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
 
-        private void testOnReceiveDataExistsNegative(final ActorRef subject) {
-            subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
+        private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+            transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
 
             ShardTransactionMessages.DataExistsReply replySerialized =
                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
@@ -202,7 +202,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
 
             // unserialized read
-            subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
+            transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -229,20 +229,18 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testWriteData");
+            final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
 
-            subject.tell(new WriteData(TestModel.TEST_PATH,
+            transaction.tell(new WriteData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
                 getRef());
 
-            ShardTransactionMessages.WriteDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
-            assertModification(subject, WriteModification.class);
+            assertModification(transaction, WriteModification.class);
 
             //unserialized write
-            subject.tell(new WriteData(TestModel.TEST_PATH,
+            transaction.tell(new WriteData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                 TestModel.createTestContext()),
                 getRef());
@@ -257,20 +255,18 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testMergeData");
+            final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
-            subject.tell(new MergeData(TestModel.TEST_PATH,
+            transaction.tell(new MergeData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
                 getRef());
 
-            ShardTransactionMessages.MergeDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
-            assertModification(subject, MergeModification.class);
+            assertModification(transaction, MergeModification.class);
 
             //unserialized merge
-            subject.tell(new MergeData(TestModel.TEST_PATH,
+            transaction.tell(new MergeData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
                 getRef());
 
@@ -284,18 +280,16 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testDeleteData");
+            final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
-            subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
 
-            ShardTransactionMessages.DeleteDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
 
-            assertModification(subject, DeleteModification.class);
+            assertModification(transaction, DeleteModification.class);
 
             //unserialized merge
-            subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
 
             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
         }};
@@ -308,12 +302,16 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testReadyTransaction");
+            final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
-            subject.tell(new ReadyTransaction().toSerializable(), getRef());
+            watch(transaction);
 
-            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+            transaction.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+                    Terminated.class);
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+                    Terminated.class);
         }};
 
         // test
@@ -321,12 +319,16 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testReadyTransaction2");
+            final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+
+            watch(transaction);
 
-            subject.tell(new ReadyTransaction(), getRef());
+            transaction.tell(new ReadyTransaction(), getRef());
 
-            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+                    Terminated.class);
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+                    Terminated.class);
         }};
 
     }
@@ -338,14 +340,14 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
+            final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
-            watch(subject);
+            watch(transaction);
 
-            subject.tell(new CloseTransaction().toSerializable(), getRef());
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
 
             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
-            expectMsgClass(duration("3 seconds"), Terminated.class);
+            expectTerminated(duration("3 seconds"), transaction);
         }};
     }
 
@@ -354,9 +356,9 @@ public class ShardTransactionTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn");
-        final TestActorRef subject = TestActorRef.apply(props,getSystem());
+        final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
-        subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+        transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
     }
 
     @Test
@@ -369,26 +371,12 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
+            final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
-            watch(subject);
+            watch(transaction);
 
-            // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
-
-            final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected String match(Object in) {
-                    if (in instanceof Terminated) {
-                        return "match";
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get(); // this extracts the received message
-
-            assertEquals("match", termination);
+            expectMsgClass(duration("3 seconds"), Terminated.class);
         }};
     }
 }