X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=f654e3aced738ef0c85a6400babe901d9c98d3ea;hp=db33e862fe6ca80c75f72b393628789ccd6f52ca;hb=83140d53722ad77dd804f7b4d761a673110b83b3;hpb=3df36bc424589cee4cbbc74e1e75b883d32046ad diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index db33e862fe..f654e3aced 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -2,55 +2,293 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; -import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import junit.framework.Assert; +import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class TransactionProxyTest extends AbstractActorTest { + private final Configuration configuration = new MockConfiguration(); + + private final ActorContext testContext = + new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration ); + + private ExecutorService transactionExecutor = + Executors.newSingleThreadExecutor(); + @Before + public void setUp(){ + ShardStrategyFactory.setConfiguration(configuration); + } @Test public void testRead() throws Exception { + final Props props = Props.create(DoNothingActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); + actorContext.setExecuteRemoteOperationResponse("message"); + + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + + ListenableFuture>> read = + transactionProxy.read(TestModel.TEST_PATH); + + Optional> normalizedNodeOptional = read.get(); + + Assert.assertFalse(normalizedNodeOptional.isPresent()); + + actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( + TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable()); + + read = transactionProxy.read(TestModel.TEST_PATH); + + normalizedNodeOptional = read.get(); + + Assert.assertTrue(normalizedNodeOptional.isPresent()); + } + + @Test + public void testReadWhenANullIsReturned() throws Exception { + final Props props = Props.create(DoNothingActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + + ListenableFuture>> read = + transactionProxy.read(TestModel.TEST_PATH); - new JavaTestKit(getSystem()) {{ - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + Optional> normalizedNodeOptional = read.get(); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + Assert.assertFalse(normalizedNodeOptional.isPresent()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( + TestModel.createTestContext(), null).toSerializable()); + read = transactionProxy.read(TestModel.TEST_PATH); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + normalizedNodeOptional = read.get(); - Optional> normalizedNodeOptional = read.get(); + Assert.assertFalse(normalizedNodeOptional.isPresent()); + } + + @Test + public void testWrite() throws Exception { + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); + + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); + + List listMessages = (List) messages; + + Assert.assertEquals(1, listMessages.size()); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + } + + private Object createPrimaryFound(ActorRef actorRef) { + return new PrimaryFound(actorRef.path().toString()).toSerializable(); + } + + @Test + public void testMerge() throws Exception { + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); + actorContext.setExecuteRemoteOperationResponse("message"); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); - read = transactionProxy.read(TestModel.TEST_PATH); + transactionProxy.merge(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.NAME_QNAME)); - normalizedNodeOptional = read.get(); + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); - Assert.assertTrue(normalizedNodeOptional.isPresent()); + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); + + List listMessages = (List) messages; + + Assert.assertEquals(1, listMessages.size()); + + Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + } + + @Test + public void testDelete() throws Exception { + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + transactionProxy.delete(TestModel.TEST_PATH); + + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); + + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); + + List listMessages = (List) messages; + + Assert.assertEquals(1, listMessages.size()); + + Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + } + + @Test + public void testReady() throws Exception { + final Props props = Props.create(DoNothingActor.class); + final ActorRef doNothingActorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); + actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable()); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + + transactionProxy.read(TestModel.TEST_PATH); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); + + } + + @Test + public void testGetIdentifier(){ + final Props props = Props.create(DoNothingActor.class); + final ActorRef doNothingActorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) ); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + Assert.assertNotNull(transactionProxy.getIdentifier()); + } + + @Test + public void testClose(){ + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.close(); + + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); + + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); + + List listMessages = (List) messages; + + Assert.assertEquals(1, listMessages.size()); + + Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)); + } - }}; + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1") + .build(); } }