Merge "BUG-650: remove executor abstraction"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
index 711f3d7a72a16b615224246e07e3adb750b7cff6..f5af93d584ce4ff62678f9f499cd629c96ccc80b 100644 (file)
@@ -1,12 +1,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
@@ -15,6 +20,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -39,19 +45,10 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class ShardTransactionTest extends AbstractActorTest {
-    private static ListeningExecutorService storeExecutor =
-        MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
     private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
+        new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
 
     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
 
@@ -70,7 +67,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private ActorRef createShard(){
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-            Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
+            Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
     }
 
     @Test
@@ -78,19 +75,21 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
 
-        private void testOnReceiveReadData(final ActorRef subject) {
+        private void testOnReceiveReadData(final ActorRef transaction) {
             //serialized read
-            subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                 getRef());
 
             ShardTransactionMessages.ReadDataReply replySerialized =
@@ -101,7 +100,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 .getNormalizedNode());
 
             // unserialized read
-            subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -114,21 +113,23 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
         }
 
-        private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
+        private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
             // serialized read
-            subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
+            transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
 
             ShardTransactionMessages.ReadDataReply replySerialized =
                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
@@ -137,7 +138,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
 
             // unserialized read
-            subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
+            transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
 
             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
 
@@ -150,18 +151,20 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
 
-        private void testOnReceiveDataExistsPositive(final ActorRef subject) {
-            subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
+        private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
                 getRef());
 
             ShardTransactionMessages.DataExistsReply replySerialized =
@@ -170,7 +173,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
 
             // unserialized read
-            subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+            transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -183,18 +186,20 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
 
-        private void testOnReceiveDataExistsNegative(final ActorRef subject) {
-            subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
+        private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+            transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
 
             ShardTransactionMessages.DataExistsReply replySerialized =
                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
@@ -202,7 +207,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
 
             // unserialized read
-            subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
+            transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
 
             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
 
@@ -228,21 +233,20 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testWriteData");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
 
-            subject.tell(new WriteData(TestModel.TEST_PATH,
+            transaction.tell(new WriteData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
                 getRef());
 
-            ShardTransactionMessages.WriteDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
-            assertModification(subject, WriteModification.class);
+            assertModification(transaction, WriteModification.class);
 
             //unserialized write
-            subject.tell(new WriteData(TestModel.TEST_PATH,
+            transaction.tell(new WriteData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                 TestModel.createTestContext()),
                 getRef());
@@ -256,21 +260,20 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testMergeData");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
-            subject.tell(new MergeData(TestModel.TEST_PATH,
+            transaction.tell(new MergeData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
                 getRef());
 
-            ShardTransactionMessages.MergeDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
-            assertModification(subject, MergeModification.class);
+            assertModification(transaction, MergeModification.class);
 
             //unserialized merge
-            subject.tell(new MergeData(TestModel.TEST_PATH,
+            transaction.tell(new MergeData(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
                 getRef());
 
@@ -283,19 +286,18 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testDeleteData");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
-            subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
 
-            ShardTransactionMessages.DeleteDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
 
-            assertModification(subject, DeleteModification.class);
+            assertModification(transaction, DeleteModification.class);
 
             //unserialized merge
-            subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
 
             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
         }};
@@ -307,45 +309,55 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testReadyTransaction");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
-            subject.tell(new ReadyTransaction().toSerializable(), getRef());
+            watch(transaction);
 
-            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+            transaction.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+                    Terminated.class);
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+                    Terminated.class);
         }};
 
         // test
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
-                getSystem().actorOf(props, "testReadyTransaction2");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+
+            watch(transaction);
 
-            subject.tell(new ReadyTransaction(), getRef());
+            transaction.tell(new ReadyTransaction(), getRef());
 
-            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+                    Terminated.class);
+            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+                    Terminated.class);
         }};
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
-            watch(subject);
+            watch(transaction);
 
-            subject.tell(new CloseTransaction().toSerializable(), getRef());
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
 
             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
-            expectMsgClass(duration("3 seconds"), Terminated.class);
+            expectTerminated(duration("3 seconds"), transaction);
         }};
     }
 
@@ -353,10 +365,11 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
-        final TestActorRef subject = TestActorRef.apply(props,getSystem());
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
+        final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
-        subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+        transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
     }
 
     @Test
@@ -368,27 +381,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
-            final ActorRef subject =
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
+            final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
-            watch(subject);
-
-            // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
+            watch(transaction);
 
-            final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected String match(Object in) {
-                    if (in instanceof Terminated) {
-                        return "match";
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get(); // this extracts the received message
-
-            assertEquals("match", termination);
+            expectMsgClass(duration("3 seconds"), Terminated.class);
         }};
     }
 }