From: Ed Warnicke Date: Fri, 4 Jul 2014 20:22:06 +0000 (+0000) Subject: Merge "Kill Dynamic Actors when we're done with them" X-Git-Tag: release/helium~534 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9d166a0ee8e022ae9ec30fb251584d57f313c077;hp=d5c00a94aa86ca1ef0a42c096f2b8d41941ace85 Merge "Kill Dynamic Actors when we're done with them" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index f747afa786..a2da063e55 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.UntypedActor; import akka.event.Logging; @@ -36,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -67,6 +69,11 @@ public class ShardTransaction extends UntypedActor { private final ActorRef shardActor; + // FIXME : see below + // If transactionChain is not null then this transaction is part of a + // transactionChain. Not really clear as to what that buys us + private final DOMStoreTransactionChain transactionChain; + private final DOMStoreReadWriteTransaction transaction; private final MutableCompositeModification modification = @@ -77,11 +84,18 @@ public class ShardTransaction extends UntypedActor { public ShardTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor) { + this(null, transaction, shardActor); + } + + public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, + ActorRef shardActor) { + this.transactionChain = transactionChain; this.transaction = transaction; this.shardActor = shardActor; } + public static Props props(final DOMStoreReadWriteTransaction transaction, final ActorRef shardActor) { return Props.create(new Creator() { @@ -93,6 +107,18 @@ public class ShardTransaction extends UntypedActor { }); } + public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction, + final ActorRef shardActor) { + return Props.create(new Creator() { + + @Override + public ShardTransaction create() throws Exception { + return new ShardTransaction(transactionChain, transaction, shardActor); + } + }); + } + + @Override public void onReceive(Object message) throws Exception { log.debug("Received message {}", message); @@ -131,7 +157,7 @@ public class ShardTransaction extends UntypedActor { if (optional.isPresent()) { sender.tell(new ReadDataReply(optional.get()), self); } else { - //TODO : Need to decide what to do here + sender.tell(new ReadDataReply(null), self); } } catch (InterruptedException | ExecutionException e) { log.error(e, @@ -176,6 +202,7 @@ public class ShardTransaction extends UntypedActor { private void closeTransaction(CloseTransaction message) { transaction.close(); getSender().tell(new CloseTransactionReply(), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 79aaa86b28..6c14f1d8d7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -34,7 +34,7 @@ public class ShardTransactionChain extends UntypedActor{ public void onReceive(Object message) throws Exception { if(message instanceof CreateTransaction){ DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction(); - ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction, getContext().parent())); + ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent())); getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf()); } else if (message instanceof CloseTransactionChain){ chain.close(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index 00d4ab5782..e6adfbee66 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.UntypedActor; import akka.event.Logging; @@ -94,6 +95,8 @@ public class ThreePhaseCommitCohort extends UntypedActor { shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext()); + getContext().parent().tell(PoisonPill.getInstance(), getSelf()); + } private void preCommit(PreCommitTransaction message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 837ffc1b51..91e903f9e8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -44,6 +44,7 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ @Override public void close() { + // FIXME : The problem here is don't know which shard the transaction chain is to be created on ??? throw new UnsupportedOperationException("close - not sure what to do here?"); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 811b851697..c12276134e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -59,17 +59,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private static final AtomicLong counter = new AtomicLong(); - private final TransactionType readOnly; + private final TransactionType transactionType; private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final String identifier; public TransactionProxy( ActorContext actorContext, - TransactionType readOnly) { + TransactionType transactionType) { this.identifier = "transaction-" + counter.getAndIncrement(); - this.readOnly = readOnly; + this.transactionType = transactionType; this.actorContext = actorContext; Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java index 8c3ec82a54..74c858e4a6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -12,6 +12,7 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; +import akka.actor.Terminated; import akka.testkit.JavaTestKit; import junit.framework.Assert; import org.junit.Test; @@ -30,11 +31,14 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; public class BasicIntegrationTest extends AbstractActorTest { @Test - public void integrationTest() { + public void integrationTest() throws Exception{ // This test will // - create a Shard // - initiate a transaction @@ -57,7 +61,7 @@ public class BasicIntegrationTest extends AbstractActorTest { shard.tell(new CreateTransactionChain(), getRef()); final ActorSelection transactionChain = - new ExpectMsg("match hint") { + new ExpectMsg("CreateTransactionChainReply") { protected ActorSelection match(Object in) { if (in instanceof CreateTransactionChainReply) { ActorPath transactionChainPath = @@ -76,7 +80,7 @@ public class BasicIntegrationTest extends AbstractActorTest { transactionChain.tell(new CreateTransaction(), getRef()); final ActorSelection transaction = - new ExpectMsg("match hint") { + new ExpectMsg("CreateTransactionReply") { protected ActorSelection match(Object in) { if (in instanceof CreateTransactionReply) { ActorPath transactionPath = @@ -92,11 +96,14 @@ public class BasicIntegrationTest extends AbstractActorTest { Assert.assertNotNull(transaction); + // Add a watch on the transaction actor so that we are notified when it dies + final ActorRef transactionActorRef = watchActor(transaction); + transaction.tell(new WriteData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef()); - Boolean writeDone = new ExpectMsg("match hint") { + Boolean writeDone = new ExpectMsg("WriteDataReply") { protected Boolean match(Object in) { if (in instanceof WriteDataReply) { return true; @@ -111,7 +118,7 @@ public class BasicIntegrationTest extends AbstractActorTest { transaction.tell(new ReadyTransaction(), getRef()); final ActorSelection cohort = - new ExpectMsg("match hint") { + new ExpectMsg("ReadyTransactionReply") { protected ActorSelection match(Object in) { if (in instanceof ReadyTransactionReply) { ActorPath cohortPath = @@ -127,10 +134,13 @@ public class BasicIntegrationTest extends AbstractActorTest { Assert.assertNotNull(cohort); + // Add a watch on the transaction actor so that we are notified when it dies + final ActorRef cohorActorRef = watchActor(cohort); + cohort.tell(new PreCommitTransaction(), getRef()); Boolean preCommitDone = - new ExpectMsg("match hint") { + new ExpectMsg("PreCommitTransactionReply") { protected Boolean match(Object in) { if (in instanceof PreCommitTransactionReply) { return true; @@ -144,8 +154,35 @@ public class BasicIntegrationTest extends AbstractActorTest { cohort.tell(new CommitTransaction(), getRef()); + final Boolean terminatedCohort = + new ExpectMsg("Terminated Cohort") { + protected Boolean match(Object in) { + if (in instanceof Terminated) { + return cohorActorRef.equals(((Terminated) in).actor()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(terminatedCohort); + + + final Boolean terminatedTransaction = + new ExpectMsg("Terminated Transaction") { + protected Boolean match(Object in) { + if (in instanceof Terminated) { + return transactionActorRef.equals(((Terminated) in).actor()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(terminatedTransaction); + final Boolean commitDone = - new ExpectMsg("match hint") { + new ExpectMsg("CommitTransactionReply") { protected Boolean match(Object in) { if (in instanceof CommitTransactionReply) { return true; @@ -161,7 +198,25 @@ public class BasicIntegrationTest extends AbstractActorTest { }; - }}; + } + + private ActorRef watchActor(ActorSelection actor) { + Future future = actor + .resolveOne(FiniteDuration.apply(100, "milliseconds")); + + try { + ActorRef actorRef = Await.result(future, + FiniteDuration.apply(100, "milliseconds")); + + watch(actorRef); + + return actorRef; + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + }; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 9116f24c92..e4d8e1b23a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; +import akka.actor.Terminated; import akka.testkit.JavaTestKit; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -32,246 +33,328 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class ShardTransactionTest extends AbstractActorTest { - private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); + private static ListeningExecutorService storeExecutor = + MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); + + private static final InMemoryDOMDataStore store = + new InMemoryDOMDataStore("OPER", storeExecutor); + + static { + store.onGlobalContextUpdated(TestModel.createTestContext()); + } + + @Test + public void testOnReceiveReadData() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard); + final ActorRef subject = getSystem().actorOf(props, "testReadData"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell( + new ReadData(InstanceIdentifier.builder().build()), + getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof ReadDataReply) { + if (((ReadDataReply) in).getNormalizedNode() + != null) { + return "match"; + } + return null; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + expectNoMsg(); + } + + + }; + }}; + } + + @Test + public void testOnReceiveReadDataWhenDataNotFound() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard); + final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell( + new ReadData(TestModel.TEST_PATH), + getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof ReadDataReply) { + if (((ReadDataReply) in).getNormalizedNode() + == null) { + return "match"; + } + return null; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + expectNoMsg(); + } + + + }; + }}; + } + + private void assertModification(final ActorRef subject, + final Class modificationType) { + new JavaTestKit(getSystem()) {{ + new Within(duration("1 seconds")) { + protected void run() { + subject + .tell(new ShardTransaction.GetCompositedModification(), + getRef()); + + final CompositeModification compositeModification = + new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected CompositeModification match(Object in) { + if (in instanceof ShardTransaction.GetCompositeModificationReply) { + return ((ShardTransaction.GetCompositeModificationReply) in) + .getModification(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue( + compositeModification.getModifications().size() == 1); + assertEquals(modificationType, + compositeModification.getModifications().get(0) + .getClass()); + + } + }; + }}; + } + + @Test + public void testOnReceiveWriteData() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard); + final ActorRef subject = + getSystem().actorOf(props, "testWriteData"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)), + getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof WriteDataReply) { + return "match"; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + assertModification(subject, WriteModification.class); + expectNoMsg(); + } + + + }; + }}; + } - private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor); + @Test + public void testOnReceiveMergeData() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard); + final ActorRef subject = + getSystem().actorOf(props, "testMergeData"); - static { - store.onGlobalContextUpdated(TestModel.createTestContext()); - } + new Within(duration("1 seconds")) { + protected void run() { - @Test - public void testOnReceiveReadData() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config")); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); - final ActorRef subject = getSystem().actorOf(props, "testReadData"); + subject.tell(new MergeData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)), + getRef()); - new Within(duration("1 seconds")) { - protected void run() { + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof MergeDataReply) { + return "match"; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message - subject.tell(new ReadData(InstanceIdentifier.builder().build()), getRef()); + assertEquals("match", out); - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof ReadDataReply) { - if (((ReadDataReply) in).getNormalizedNode() != null) { - return "match"; + assertModification(subject, MergeModification.class); + + expectNoMsg(); } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - expectNoMsg(); - } - - - }; - }}; - } - - private void assertModification(final ActorRef subject, final Class modificationType){ - new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - protected void run() { - subject.tell(new ShardTransaction.GetCompositedModification(), getRef()); - - final CompositeModification compositeModification = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected CompositeModification match(Object in) { - if (in instanceof ShardTransaction.GetCompositeModificationReply) { - return ((ShardTransaction.GetCompositeModificationReply) in).getModification(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertTrue(compositeModification.getModifications().size() == 1); - assertEquals(modificationType, compositeModification.getModifications().get(0).getClass()); - - } - }; - }}; - } - - @Test - public void testOnReceiveWriteData() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config")); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); - final ActorRef subject = getSystem().actorOf(props, "testWriteData"); - - new Within(duration("1 seconds")) { - protected void run() { - - subject.tell(new WriteData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef()); - - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof WriteDataReply) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - assertModification(subject, WriteModification.class); - expectNoMsg(); - } - - - }; - }}; - } - - @Test - public void testOnReceiveMergeData() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config")); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); - final ActorRef subject = getSystem().actorOf(props, "testMergeData"); - - new Within(duration("1 seconds")) { - protected void run() { - - subject.tell(new MergeData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef()); - - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof MergeDataReply) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - assertModification(subject, MergeModification.class); - - expectNoMsg(); - } - - - }; - }}; - } - - @Test - public void testOnReceiveDeleteData() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config")); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); - final ActorRef subject = getSystem().actorOf(props, "testDeleteData"); - new Within(duration("1 seconds")) { - protected void run() { - subject.tell(new DeleteData(TestModel.TEST_PATH), getRef()); + }; + }}; + } + + @Test + public void testOnReceiveDeleteData() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard); + final ActorRef subject = + getSystem().actorOf(props, "testDeleteData"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new DeleteData(TestModel.TEST_PATH), getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof DeleteDataReply) { + return "match"; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof DeleteDataReply) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + assertEquals("match", out); + + assertModification(subject, DeleteModification.class); + expectNoMsg(); + } - assertEquals("match", out); - assertModification(subject, DeleteModification.class); - expectNoMsg(); - } + }; + }}; + } - }; - }}; - } + @Test + public void testOnReceiveReadyTransaction() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard); + final ActorRef subject = + getSystem().actorOf(props, "testReadyTransaction"); + new Within(duration("1 seconds")) { + protected void run() { - @Test - public void testOnReceiveReadyTransaction() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config")); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); - final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction"); + subject.tell(new ReadyTransaction(), getRef()); - new Within(duration("1 seconds")) { - protected void run() { + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof ReadyTransactionReply) { + return "match"; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message - subject.tell(new ReadyTransaction(), getRef()); + assertEquals("match", out); - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof ReadyTransactionReply) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + expectNoMsg(); + } - assertEquals("match", out); - expectNoMsg(); - } + }; + }}; + } - }; - }}; + @Test + public void testOnReceiveCloseTransaction() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard); + final ActorRef subject = + getSystem().actorOf(props, "testCloseTransaction"); - } + watch(subject); - @Test - public void testOnReceiveCloseTransaction() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config")); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); - final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction"); + new Within(duration("2 seconds")) { + protected void run() { - new Within(duration("1 seconds")) { - protected void run() { + subject.tell(new CloseTransaction(), getRef()); - subject.tell(new CloseTransaction(), getRef()); + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof CloseTransactionReply) { + return "match"; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof CloseTransactionReply) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + assertEquals("match", out); - assertEquals("match", out); + final String termination = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof Terminated) { + return "match"; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message - expectNoMsg(); - } + + expectNoMsg(); + } - }; - }}; + }; + }}; - } + } -} \ No newline at end of file +}