Kill Dynamic Actors when we're done with them 60/8360/4
authorMoiz Raja <moraja@cisco.com>
Thu, 26 Jun 2014 01:44:26 +0000 (18:44 -0700)
committerMoiz Raja <moraja@cisco.com>
Thu, 3 Jul 2014 21:08:27 +0000 (14:08 -0700)
Kill ShardTransaction on close and on ThreePhaseCommitCohort#commit
Kill ThreePhaseCommitCohort on commit

Change-Id: Ie86b66cf3841baa514d82509fbc5b817eb7c6740
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

index f747afa7867ac8f7f6434987045315b98bc9cca5..a2da063e55d465ffd4c9bb256e9a257475d0e107 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
@@ -36,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -67,6 +69,11 @@ public class ShardTransaction extends UntypedActor {
 
     private final ActorRef shardActor;
 
+    // FIXME : see below
+    // If transactionChain is not null then this transaction is part of a
+    // transactionChain. Not really clear as to what that buys us
+    private final DOMStoreTransactionChain transactionChain;
+
     private final DOMStoreReadWriteTransaction transaction;
 
     private final MutableCompositeModification modification =
@@ -77,11 +84,18 @@ public class ShardTransaction extends UntypedActor {
 
     public ShardTransaction(DOMStoreReadWriteTransaction transaction,
         ActorRef shardActor) {
+        this(null, transaction, shardActor);
+    }
+
+    public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction,
+        ActorRef shardActor) {
+        this.transactionChain = transactionChain;
         this.transaction = transaction;
         this.shardActor = shardActor;
     }
 
 
+
     public static Props props(final DOMStoreReadWriteTransaction transaction,
         final ActorRef shardActor) {
         return Props.create(new Creator<ShardTransaction>() {
@@ -93,6 +107,18 @@ public class ShardTransaction extends UntypedActor {
         });
     }
 
+    public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction,
+        final ActorRef shardActor) {
+        return Props.create(new Creator<ShardTransaction>() {
+
+            @Override
+            public ShardTransaction create() throws Exception {
+                return new ShardTransaction(transactionChain, transaction, shardActor);
+            }
+        });
+    }
+
+
     @Override
     public void onReceive(Object message) throws Exception {
         log.debug("Received message {}", message);
@@ -131,7 +157,7 @@ public class ShardTransaction extends UntypedActor {
                     if (optional.isPresent()) {
                         sender.tell(new ReadDataReply(optional.get()), self);
                     } else {
-                        //TODO : Need to decide what to do here
+                        sender.tell(new ReadDataReply(null), self);
                     }
                 } catch (InterruptedException | ExecutionException e) {
                     log.error(e,
@@ -176,6 +202,7 @@ public class ShardTransaction extends UntypedActor {
     private void closeTransaction(CloseTransaction message) {
         transaction.close();
         getSender().tell(new CloseTransactionReply(), getSelf());
+        getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
 
index 79aaa86b28baaa71f161dceca0d56f59528d94a1..6c14f1d8d78560f202344b237703e284d144bd54 100644 (file)
@@ -34,7 +34,7 @@ public class ShardTransactionChain extends UntypedActor{
   public void onReceive(Object message) throws Exception {
     if(message instanceof CreateTransaction){
       DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
-      ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction, getContext().parent()));
+      ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent()));
       getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf());
     } else if (message instanceof CloseTransactionChain){
       chain.close();
index 00d4ab5782e3e7f0a365bdd9ace5be917b423dc1..e6adfbee660669afd0889995c5e4ec0c755810e9 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
@@ -94,6 +95,8 @@ public class ThreePhaseCommitCohort extends UntypedActor {
         shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
             getContext());
 
+        getContext().parent().tell(PoisonPill.getInstance(), getSelf());
+
     }
 
     private void preCommit(PreCommitTransaction message) {
index 837ffc1b51dd8c5b2a80a3cf2f71a1076406648d..91e903f9e8993bfb68b83e0d28514124b163b258 100644 (file)
@@ -44,6 +44,7 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
 
     @Override
     public void close() {
+        // FIXME : The problem here is don't know which shard the transaction chain is to be created on ???
         throw new UnsupportedOperationException("close - not sure what to do here?");
     }
 }
index 811b851697910cf19d94a5079b1054b8635e8190..c12276134e99c0bff0e9f3dc6c3d61550c39e4ad 100644 (file)
@@ -59,17 +59,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     private static final AtomicLong counter = new AtomicLong();
 
-    private final TransactionType readOnly;
+    private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
     private final String identifier;
 
     public TransactionProxy(
         ActorContext actorContext,
-        TransactionType readOnly) {
+        TransactionType transactionType) {
 
         this.identifier = "transaction-" + counter.getAndIncrement();
-        this.readOnly = readOnly;
+        this.transactionType = transactionType;
         this.actorContext = actorContext;
 
         Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
index 8c3ec82a54eb7c50d195675cffc122b6ed13d546..74c858e4a6b329ccb6536ddd7f4a1d42bec2885e 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
@@ -30,11 +31,14 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 public class BasicIntegrationTest extends AbstractActorTest {
 
     @Test
-    public void integrationTest() {
+    public void integrationTest() throws Exception{
         // This test will
         // - create a Shard
         // - initiate a transaction
@@ -57,7 +61,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     shard.tell(new CreateTransactionChain(), getRef());
 
                     final ActorSelection transactionChain =
-                        new ExpectMsg<ActorSelection>("match hint") {
+                        new ExpectMsg<ActorSelection>("CreateTransactionChainReply") {
                             protected ActorSelection match(Object in) {
                                 if (in instanceof CreateTransactionChainReply) {
                                     ActorPath transactionChainPath =
@@ -76,7 +80,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transactionChain.tell(new CreateTransaction(), getRef());
 
                     final ActorSelection transaction =
-                        new ExpectMsg<ActorSelection>("match hint") {
+                        new ExpectMsg<ActorSelection>("CreateTransactionReply") {
                             protected ActorSelection match(Object in) {
                                 if (in instanceof CreateTransactionReply) {
                                     ActorPath transactionPath =
@@ -92,11 +96,14 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Assert.assertNotNull(transaction);
 
+                    // Add a watch on the transaction actor so that we are notified when it dies
+                    final ActorRef transactionActorRef = watchActor(transaction);
+
                     transaction.tell(new WriteData(TestModel.TEST_PATH,
                         ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
                         getRef());
 
-                    Boolean writeDone = new ExpectMsg<Boolean>("match hint") {
+                    Boolean writeDone = new ExpectMsg<Boolean>("WriteDataReply") {
                         protected Boolean match(Object in) {
                             if (in instanceof WriteDataReply) {
                                 return true;
@@ -111,7 +118,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transaction.tell(new ReadyTransaction(), getRef());
 
                     final ActorSelection cohort =
-                        new ExpectMsg<ActorSelection>("match hint") {
+                        new ExpectMsg<ActorSelection>("ReadyTransactionReply") {
                             protected ActorSelection match(Object in) {
                                 if (in instanceof ReadyTransactionReply) {
                                     ActorPath cohortPath =
@@ -127,10 +134,13 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Assert.assertNotNull(cohort);
 
+                    // Add a watch on the transaction actor so that we are notified when it dies
+                    final ActorRef cohorActorRef = watchActor(cohort);
+
                     cohort.tell(new PreCommitTransaction(), getRef());
 
                     Boolean preCommitDone =
-                        new ExpectMsg<Boolean>("match hint") {
+                        new ExpectMsg<Boolean>("PreCommitTransactionReply") {
                             protected Boolean match(Object in) {
                                 if (in instanceof PreCommitTransactionReply) {
                                     return true;
@@ -144,8 +154,35 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     cohort.tell(new CommitTransaction(), getRef());
 
+                    final Boolean terminatedCohort =
+                        new ExpectMsg<Boolean>("Terminated Cohort") {
+                            protected Boolean match(Object in) {
+                                if (in instanceof Terminated) {
+                                    return cohorActorRef.equals(((Terminated) in).actor());
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    Assert.assertTrue(terminatedCohort);
+
+
+                    final Boolean terminatedTransaction =
+                        new ExpectMsg<Boolean>("Terminated Transaction") {
+                            protected Boolean match(Object in) {
+                                if (in instanceof Terminated) {
+                                    return transactionActorRef.equals(((Terminated) in).actor());
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    Assert.assertTrue(terminatedTransaction);
+
                     final Boolean commitDone =
-                        new ExpectMsg<Boolean>("match hint") {
+                        new ExpectMsg<Boolean>("CommitTransactionReply") {
                             protected Boolean match(Object in) {
                                 if (in instanceof CommitTransactionReply) {
                                     return true;
@@ -161,7 +198,25 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
 
             };
-        }};
+        }
+
+            private ActorRef watchActor(ActorSelection actor) {
+                Future<ActorRef> future = actor
+                    .resolveOne(FiniteDuration.apply(100, "milliseconds"));
+
+                try {
+                    ActorRef actorRef = Await.result(future,
+                        FiniteDuration.apply(100, "milliseconds"));
+
+                    watch(actorRef);
+
+                    return actorRef;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+            }
+        };
 
 
     }
index 9116f24c92971b3f0491b6de52d07eff01d84645..e4d8e1b23a4acf354e7fc68ab9849c0e46d8549e 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -32,246 +33,328 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ShardTransactionTest extends AbstractActorTest {
-  private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+    private static ListeningExecutorService storeExecutor =
+        MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+    private static final InMemoryDOMDataStore store =
+        new InMemoryDOMDataStore("OPER", storeExecutor);
+
+    static {
+        store.onGlobalContextUpdated(TestModel.createTestContext());
+    }
+
+    @Test
+    public void testOnReceiveReadData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+            final Props props =
+                ShardTransaction.props(store.newReadWriteTransaction(), shard);
+            final ActorRef subject = getSystem().actorOf(props, "testReadData");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(
+                        new ReadData(InstanceIdentifier.builder().build()),
+                        getRef());
+
+                    final String out = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof ReadDataReply) {
+                                if (((ReadDataReply) in).getNormalizedNode()
+                                    != null) {
+                                    return "match";
+                                }
+                                return null;
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+            final Props props =
+                ShardTransaction.props(store.newReadWriteTransaction(), shard);
+            final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(
+                        new ReadData(TestModel.TEST_PATH),
+                        getRef());
+
+                    final String out = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof ReadDataReply) {
+                                if (((ReadDataReply) in).getNormalizedNode()
+                                    == null) {
+                                    return "match";
+                                }
+                                return null;
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+
+    private void assertModification(final ActorRef subject,
+        final Class<? extends Modification> modificationType) {
+        new JavaTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+                    subject
+                        .tell(new ShardTransaction.GetCompositedModification(),
+                            getRef());
+
+                    final CompositeModification compositeModification =
+                        new ExpectMsg<CompositeModification>("match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected CompositeModification match(Object in) {
+                                if (in instanceof ShardTransaction.GetCompositeModificationReply) {
+                                    return ((ShardTransaction.GetCompositeModificationReply) in)
+                                        .getModification();
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertTrue(
+                        compositeModification.getModifications().size() == 1);
+                    assertEquals(modificationType,
+                        compositeModification.getModifications().get(0)
+                            .getClass());
+
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveWriteData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+            final Props props =
+                ShardTransaction.props(store.newReadWriteTransaction(), shard);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testWriteData");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(new WriteData(TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                        getRef());
+
+                    final String out = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof WriteDataReply) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+
+                    assertModification(subject, WriteModification.class);
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
 
-  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+    @Test
+    public void testOnReceiveMergeData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+            final Props props =
+                ShardTransaction.props(store.newReadWriteTransaction(), shard);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testMergeData");
 
-  static {
-    store.onGlobalContextUpdated(TestModel.createTestContext());
-  }
+            new Within(duration("1 seconds")) {
+                protected void run() {
 
-  @Test
-  public void testOnReceiveReadData() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
-      final ActorRef subject = getSystem().actorOf(props, "testReadData");
+                    subject.tell(new MergeData(TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                        getRef());
 
-      new Within(duration("1 seconds")) {
-        protected void run() {
+                    final String out = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof MergeDataReply) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
 
-          subject.tell(new ReadData(InstanceIdentifier.builder().build()), getRef());
+                    assertEquals("match", out);
 
-          final String out = new ExpectMsg<String>("match hint") {
-            // do not put code outside this method, will run afterwards
-            protected String match(Object in) {
-              if (in instanceof ReadDataReply) {
-                if (((ReadDataReply) in).getNormalizedNode() != null) {
-                  return "match";
+                    assertModification(subject, MergeModification.class);
+
+                    expectNoMsg();
                 }
-                return null;
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertEquals("match", out);
-
-          expectNoMsg();
-        }
-
-
-      };
-    }};
-  }
-
-  private void assertModification(final ActorRef subject, final Class<? extends Modification> modificationType){
-    new JavaTestKit(getSystem()) {{
-      new Within(duration("1 seconds")) {
-        protected void run() {
-          subject.tell(new ShardTransaction.GetCompositedModification(), getRef());
-
-          final CompositeModification compositeModification = new ExpectMsg<CompositeModification>("match hint") {
-            // do not put code outside this method, will run afterwards
-            protected CompositeModification match(Object in) {
-              if (in instanceof ShardTransaction.GetCompositeModificationReply) {
-                return ((ShardTransaction.GetCompositeModificationReply) in).getModification();
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertTrue(compositeModification.getModifications().size() == 1);
-          assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
-
-        }
-      };
-    }};
-  }
-
-  @Test
-  public void testOnReceiveWriteData() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
-      final ActorRef subject = getSystem().actorOf(props, "testWriteData");
-
-      new Within(duration("1 seconds")) {
-        protected void run() {
-
-          subject.tell(new WriteData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
-
-          final String out = new ExpectMsg<String>("match hint") {
-            // do not put code outside this method, will run afterwards
-            protected String match(Object in) {
-              if (in instanceof WriteDataReply) {
-                return "match";
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertEquals("match", out);
-
-          assertModification(subject, WriteModification.class);
-          expectNoMsg();
-        }
-
-
-      };
-    }};
-  }
-
-  @Test
-  public void testOnReceiveMergeData() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
-      final ActorRef subject = getSystem().actorOf(props, "testMergeData");
-
-      new Within(duration("1 seconds")) {
-        protected void run() {
-
-          subject.tell(new MergeData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
-
-          final String out = new ExpectMsg<String>("match hint") {
-            // do not put code outside this method, will run afterwards
-            protected String match(Object in) {
-              if (in instanceof MergeDataReply) {
-                return "match";
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertEquals("match", out);
-
-          assertModification(subject, MergeModification.class);
-
-          expectNoMsg();
-        }
-
-
-      };
-    }};
-  }
-
-  @Test
-  public void testOnReceiveDeleteData() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
-      final ActorRef subject = getSystem().actorOf(props, "testDeleteData");
 
-      new Within(duration("1 seconds")) {
-        protected void run() {
 
-          subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveDeleteData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+            final Props props =
+                ShardTransaction.props(store.newReadWriteTransaction(), shard);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testDeleteData");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+
+                    final String out = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof DeleteDataReply) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
 
-          final String out = new ExpectMsg<String>("match hint") {
-            // do not put code outside this method, will run afterwards
-            protected String match(Object in) {
-              if (in instanceof DeleteDataReply) {
-                return "match";
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
+                    assertEquals("match", out);
+
+                    assertModification(subject, DeleteModification.class);
+                    expectNoMsg();
+                }
 
-          assertEquals("match", out);
 
-          assertModification(subject, DeleteModification.class);
-          expectNoMsg();
-        }
+            };
+        }};
+    }
 
 
-      };
-    }};
-  }
+    @Test
+    public void testOnReceiveReadyTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+            final Props props =
+                ShardTransaction.props(store.newReadWriteTransaction(), shard);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testReadyTransaction");
 
+            new Within(duration("1 seconds")) {
+                protected void run() {
 
-  @Test
-  public void testOnReceiveReadyTransaction() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
-      final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction");
+                    subject.tell(new ReadyTransaction(), getRef());
 
-      new Within(duration("1 seconds")) {
-        protected void run() {
+                    final String out = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof ReadyTransactionReply) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
 
-          subject.tell(new ReadyTransaction(), getRef());
+                    assertEquals("match", out);
 
-          final String out = new ExpectMsg<String>("match hint") {
-            // do not put code outside this method, will run afterwards
-            protected String match(Object in) {
-              if (in instanceof ReadyTransactionReply) {
-                return "match";
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
+                    expectNoMsg();
+                }
 
-          assertEquals("match", out);
 
-          expectNoMsg();
-        }
+            };
+        }};
 
+    }
 
-      };
-    }};
+    @Test
+    public void testOnReceiveCloseTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+            final Props props =
+                ShardTransaction.props(store.newReadWriteTransaction(), shard);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testCloseTransaction");
 
-  }
+            watch(subject);
 
-  @Test
-  public void testOnReceiveCloseTransaction() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
-      final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
+            new Within(duration("2 seconds")) {
+                protected void run() {
 
-      new Within(duration("1 seconds")) {
-        protected void run() {
+                    subject.tell(new CloseTransaction(), getRef());
 
-          subject.tell(new CloseTransaction(), getRef());
+                    final String out = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof CloseTransactionReply) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
 
-          final String out = new ExpectMsg<String>("match hint") {
-            // do not put code outside this method, will run afterwards
-            protected String match(Object in) {
-              if (in instanceof CloseTransactionReply) {
-                return "match";
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
+                    assertEquals("match", out);
 
-          assertEquals("match", out);
+                    final String termination = new ExpectMsg<String>("match hint") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in instanceof Terminated) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
 
-          expectNoMsg();
-        }
+
+                    expectNoMsg();
+                }
 
 
-      };
-    }};
+            };
+        }};
 
-  }
+    }
 
 
-}
\ No newline at end of file
+}