From e40586573686ebb1529fffb782621369bcab1496 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Sun, 26 Oct 2014 17:15:19 -0400 Subject: [PATCH] Bug 2252: Terminate ShardWriteTransaction actor on ready Change-Id: I16f9dcf370aab1e23073163e0e055a40f1415a1a Signed-off-by: tpantelis --- .../datastore/ShardWriteTransaction.java | 41 ++++--- .../ShardTransactionFailureTest.java | 9 +- .../datastore/ShardTransactionTest.java | 116 ++++++++---------- 3 files changed, 82 insertions(+), 84 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 21c210daf2..b0eaf98d59 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.PoisonPill; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; @@ -87,12 +88,12 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void writeData(DOMStoreWriteTransaction transaction, WriteData message, boolean returnSerialized) { + private void writeData(DOMStoreWriteTransaction transaction, WriteData message, + boolean returnSerialized) { + LOG.debug("writeData at path : {}", message.getPath()); + modification.addModification( new WriteModification(message.getPath(), message.getData(), getSchemaContext())); - if(LOG.isDebugEnabled()) { - LOG.debug("writeData at path : " + message.getPath().toString()); - } try { transaction.write(message.getPath(), message.getData()); WriteDataReply writeDataReply = new WriteDataReply(); @@ -103,12 +104,13 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, boolean returnSerialized) { + private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, + boolean returnSerialized) { + LOG.debug("mergeData at path : {}", message.getPath()); + modification.addModification( new MergeModification(message.getPath(), message.getData(), getSchemaContext())); - if(LOG.isDebugEnabled()) { - LOG.debug("mergeData at path : " + message.getPath().toString()); - } + try { transaction.merge(message.getPath(), message.getData()); MergeDataReply mergeDataReply = new MergeDataReply(); @@ -119,10 +121,10 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, boolean returnSerialized) { - if(LOG.isDebugEnabled()) { - LOG.debug("deleteData at path : " + message.getPath().toString()); - } + private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, + boolean returnSerialized) { + LOG.debug("deleteData at path : {}", message.getPath()); + modification.addModification(new DeleteModification(message.getPath())); try { transaction.delete(message.getPath()); @@ -134,12 +136,19 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, boolean returnSerialized) { + private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, + boolean returnSerialized) { + String transactionID = getTransactionID(); + + LOG.debug("readyTransaction : {}", transactionID); + DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); - getShardActor().forward(new ForwardedReadyTransaction( - getTransactionID(), cohort, modification, returnSerialized), - getContext()); + getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification, + returnSerialized), getContext()); + + // The shard will handle the commit from here so we're no longer needed - self-destruct. + getSelf().tell(PoisonPill.getInstance(), getSelf()); } // These classes are in here for test purposes only diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 17731de5cd..6375e3c7fb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; +import akka.pattern.AskTimeoutException; import akka.testkit.TestActorRef; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -155,7 +156,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { Await.result(future, Duration.create(3, TimeUnit.SECONDS)); } - @Test(expected = IllegalStateException.class) + @Test(expected = AskTimeoutException.class) public void testNegativeWriteWithTransactionReady() throws Exception { @@ -187,7 +188,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { Await.result(future, Duration.create(3, TimeUnit.SECONDS)); } - @Test(expected = IllegalStateException.class) + @Test(expected = AskTimeoutException.class) public void testNegativeReadWriteWithTransactionReady() throws Exception { @@ -224,7 +225,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build()); } - @Test(expected = IllegalStateException.class) + @Test(expected = AskTimeoutException.class) public void testNegativeMergeTransactionReady() throws Exception { @@ -256,7 +257,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { } - @Test(expected = IllegalStateException.class) + @Test(expected = AskTimeoutException.class) public void testNegativeDeleteDataWhenTransactionReady() throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 711f3d7a72..793df8e0ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -88,9 +88,9 @@ public class ShardTransactionTest extends AbstractActorTest { testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW")); } - private void testOnReceiveReadData(final ActorRef subject) { + private void testOnReceiveReadData(final ActorRef transaction) { //serialized read - subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), + transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), getRef()); ShardTransactionMessages.ReadDataReply replySerialized = @@ -101,7 +101,7 @@ public class ShardTransactionTest extends AbstractActorTest { .getNormalizedNode()); // unserialized read - subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef()); + transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef()); ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class); @@ -126,9 +126,9 @@ public class ShardTransactionTest extends AbstractActorTest { props, "testReadDataWhenDataNotFoundRW")); } - private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) { + private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) { // serialized read - subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef()); + transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef()); ShardTransactionMessages.ReadDataReply replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS); @@ -137,7 +137,7 @@ public class ShardTransactionTest extends AbstractActorTest { testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null); // unserialized read - subject.tell(new ReadData(TestModel.TEST_PATH),getRef()); + transaction.tell(new ReadData(TestModel.TEST_PATH),getRef()); ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class); @@ -160,8 +160,8 @@ public class ShardTransactionTest extends AbstractActorTest { testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW")); } - private void testOnReceiveDataExistsPositive(final ActorRef subject) { - subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(), + private void testOnReceiveDataExistsPositive(final ActorRef transaction) { + transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(), getRef()); ShardTransactionMessages.DataExistsReply replySerialized = @@ -170,7 +170,7 @@ public class ShardTransactionTest extends AbstractActorTest { assertTrue(DataExistsReply.fromSerializable(replySerialized).exists()); // unserialized read - subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef()); + transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef()); DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class); @@ -193,8 +193,8 @@ public class ShardTransactionTest extends AbstractActorTest { testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW")); } - private void testOnReceiveDataExistsNegative(final ActorRef subject) { - subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef()); + private void testOnReceiveDataExistsNegative(final ActorRef transaction) { + transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef()); ShardTransactionMessages.DataExistsReply replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class); @@ -202,7 +202,7 @@ public class ShardTransactionTest extends AbstractActorTest { assertFalse(DataExistsReply.fromSerializable(replySerialized).exists()); // unserialized read - subject.tell(new DataExists(TestModel.TEST_PATH),getRef()); + transaction.tell(new DataExists(TestModel.TEST_PATH),getRef()); DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class); @@ -229,20 +229,18 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = - getSystem().actorOf(props, "testWriteData"); + final ActorRef transaction = getSystem().actorOf(props, "testWriteData"); - subject.tell(new WriteData(TestModel.TEST_PATH, + transaction.tell(new WriteData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(), getRef()); - ShardTransactionMessages.WriteDataReply replySerialized = - expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class); + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class); - assertModification(subject, WriteModification.class); + assertModification(transaction, WriteModification.class); //unserialized write - subject.tell(new WriteData(TestModel.TEST_PATH, + transaction.tell(new WriteData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()), getRef()); @@ -257,20 +255,18 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = - getSystem().actorOf(props, "testMergeData"); + final ActorRef transaction = getSystem().actorOf(props, "testMergeData"); - subject.tell(new MergeData(TestModel.TEST_PATH, + transaction.tell(new MergeData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(), getRef()); - ShardTransactionMessages.MergeDataReply replySerialized = - expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class); + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class); - assertModification(subject, MergeModification.class); + assertModification(transaction, MergeModification.class); //unserialized merge - subject.tell(new MergeData(TestModel.TEST_PATH, + transaction.tell(new MergeData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext), getRef()); @@ -284,18 +280,16 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = - getSystem().actorOf(props, "testDeleteData"); + final ActorRef transaction = getSystem().actorOf(props, "testDeleteData"); - subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); + transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); - ShardTransactionMessages.DeleteDataReply replySerialized = - expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class); + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class); - assertModification(subject, DeleteModification.class); + assertModification(transaction, DeleteModification.class); //unserialized merge - subject.tell(new DeleteData(TestModel.TEST_PATH), getRef()); + transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef()); expectMsgClass(duration("5 seconds"), DeleteDataReply.class); }}; @@ -308,12 +302,16 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = - getSystem().actorOf(props, "testReadyTransaction"); + final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction"); - subject.tell(new ReadyTransaction().toSerializable(), getRef()); + watch(transaction); - expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS); + transaction.tell(new ReadyTransaction().toSerializable(), getRef()); + + expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, + Terminated.class); + expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, + Terminated.class); }}; // test @@ -321,12 +319,16 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = - getSystem().actorOf(props, "testReadyTransaction2"); + final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2"); + + watch(transaction); - subject.tell(new ReadyTransaction(), getRef()); + transaction.tell(new ReadyTransaction(), getRef()); - expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); + expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, + Terminated.class); + expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, + Terminated.class); }}; } @@ -338,14 +340,14 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction"); + final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction"); - watch(subject); + watch(transaction); - subject.tell(new CloseTransaction().toSerializable(), getRef()); + transaction.tell(new CloseTransaction().toSerializable(), getRef()); expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS); - expectMsgClass(duration("3 seconds"), Terminated.class); + expectTerminated(duration("3 seconds"), transaction); }}; } @@ -354,9 +356,9 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final TestActorRef subject = TestActorRef.apply(props,getSystem()); + final TestActorRef transaction = TestActorRef.apply(props,getSystem()); - subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender()); + transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender()); } @Test @@ -369,26 +371,12 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = + final ActorRef transaction = getSystem().actorOf(props, "testShardTransactionInactivity"); - watch(subject); + watch(transaction); - // The shard Tx actor should receive a ReceiveTimeout message and self-destruct. - - final String termination = new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in instanceof Terminated) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", termination); + expectMsgClass(duration("3 seconds"), Terminated.class); }}; } } -- 2.36.6