ReadData and ReadDataReply protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index db33e862fe6ca80c75f72b393628789ccd6f52ca..1e9fc90a6e1e9831ab9ecb86f5692dbacf7aec6f 100644 (file)
@@ -2,55 +2,267 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import junit.framework.Assert;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-public class TransactionProxyTest extends AbstractActorTest {
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+public class TransactionProxyTest extends AbstractActorTest {
 
+    private ExecutorService transactionExecutor =
+        Executors.newSingleThreadExecutor();
 
     @Test
     public void testRead() throws Exception {
+        final Props props = Props.create(DoNothingActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
 
-        new JavaTestKit(getSystem()) {{
-            final Props props = Props.create(DoNothingActor.class);
-            final ActorRef actorRef = getSystem().actorOf(props);
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+            transactionProxy.read(TestModel.TEST_PATH);
 
-            final MockActorContext actorContext = new MockActorContext(this.getSystem());
-            actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
-            actorContext.setExecuteRemoteOperationResponse("message");
+        Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
 
-            TransactionProxy transactionProxy =
-                new TransactionProxy(actorContext,
-                    TransactionProxy.TransactionType.READ_ONLY);
+        Assert.assertFalse(normalizedNodeOptional.isPresent());
 
+        actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
+            TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
 
-            ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
-                transactionProxy.read(TestModel.TEST_PATH);
+        read = transactionProxy.read(TestModel.TEST_PATH);
+
+        normalizedNodeOptional = read.get();
+
+        Assert.assertTrue(normalizedNodeOptional.isPresent());
+    }
+
+    @Test
+    public void testReadWhenANullIsReturned() throws Exception {
+        final Props props = Props.create(DoNothingActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
 
-            Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
+        actorContext.setExecuteRemoteOperationResponse("message");
 
-            Assert.assertFalse(normalizedNodeOptional.isPresent());
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
-            actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
 
-            read = transactionProxy.read(TestModel.TEST_PATH);
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+            transactionProxy.read(TestModel.TEST_PATH);
+
+        Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
+
+        Assert.assertFalse(normalizedNodeOptional.isPresent());
+
+        actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
+           TestModel.createTestContext(), null).toSerializable());
+
+        read = transactionProxy.read(TestModel.TEST_PATH);
+
+        normalizedNodeOptional = read.get();
+
+        Assert.assertFalse(normalizedNodeOptional.isPresent());
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
+        actorContext.setExecuteRemoteOperationResponse("message");
 
-            normalizedNodeOptional = read.get();
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+        transactionProxy.write(TestModel.TEST_PATH,
+            ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+        transactionProxy.merge(TestModel.TEST_PATH,
+            ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+    }
 
-            Assert.assertTrue(normalizedNodeOptional.isPresent());
+    @Test
+    public void testDelete() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+    }
+
+    @Test
+    public void testReady() throws Exception {
+        final Props props = Props.create(DoNothingActor.class);
+        final ActorRef doNothingActorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
+        actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
+
+    }
+
+    @Test
+    public void testGetIdentifier(){
+        final Props props = Props.create(DoNothingActor.class);
+        final ActorRef doNothingActorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+        Assert.assertNotNull(transactionProxy.getIdentifier());
+    }
+
+    @Test
+    public void testClose(){
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+        transactionProxy.close();
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
+    }
 
-        }};
+    private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+        return CreateTransactionReply.newBuilder()
+            .setTransactionActorPath(actorRef.path().toString())
+            .setTransactionId("txn-1")
+            .build();
     }
 }