X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=62052f38ab89b6962dc31622332f85113492c924;hb=971b179000ef1cc56699de35061cf6f97d4cf36f;hp=89cf7e7728f8bbaec5a28b4c389869ecb43f0391;hpb=af255d4824ce1290d6e6b4c669a5d9b0c5960f34;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 89cf7e7728..62052f38ab 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -3,37 +3,67 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import junit.framework.Assert; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.duration.FiniteDuration; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static junit.framework.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TransactionProxyTest extends AbstractActorTest { + private final Configuration configuration = new MockConfiguration(); + private final ActorContext testContext = - new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockConfiguration()); + new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration ); + + private final ListeningExecutorService transactionExecutor = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + + @Before + public void setUp(){ + ShardStrategyFactory.setConfiguration(configuration); + } - private ExecutorService transactionExecutor = - Executors.newSingleThreadExecutor(); + @After + public void tearDown() { + transactionExecutor.shutdownNow(); + } @Test public void testRead() throws Exception { @@ -41,6 +71,7 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); @@ -50,6 +81,10 @@ public class TransactionProxyTest extends AbstractActorTest { TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + actorContext.setExecuteRemoteOperationResponse( + new ReadDataReply(TestModel.createTestContext(), null) + .toSerializable()); + ListenableFuture>> read = transactionProxy.read(TestModel.TEST_PATH); @@ -68,14 +103,75 @@ public class TransactionProxyTest extends AbstractActorTest { } @Test - public void testReadWhenANullIsReturned() throws Exception { + public void testExists() throws Exception { final Props props = Props.create(DoNothingActor.class); final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + + actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(false).toSerializable()); + + CheckedFuture exists = + transactionProxy.exists(TestModel.TEST_PATH); + + Assert.assertFalse(exists.checkedGet()); + + actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(true).toSerializable()); + + exists = transactionProxy.exists(TestModel.TEST_PATH); + + Assert.assertTrue(exists.checkedGet()); + + actorContext.setExecuteRemoteOperationResponse("bad message"); + + exists = transactionProxy.exists(TestModel.TEST_PATH); + + try { + exists.checkedGet(); + fail(); + } catch(ReadFailedException e){ + } + + } + + @Test(expected = ReadFailedException.class) + public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { + final Props props = Props.create(DoNothingActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + + + CheckedFuture>, ReadFailedException> + read = transactionProxy.read(TestModel.TEST_PATH); + + read.checkedGet(); + } + + @Test + public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception { + final ActorContext actorContext = mock(ActorContext.class); + + when(actorContext.executeShardOperation(anyString(), any(), any( + FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test")); + TransactionProxy transactionProxy = new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); @@ -84,26 +180,60 @@ public class TransactionProxyTest extends AbstractActorTest { ListenableFuture>> read = transactionProxy.read(TestModel.TEST_PATH); - Optional> normalizedNodeOptional = read.get(); + Assert.assertFalse(read.get().isPresent()); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + } - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - TestModel.createTestContext(), null).toSerializable()); - read = transactionProxy.read(TestModel.TEST_PATH); + @Test + public void testReadWhenATimeoutExceptionIsThrown() throws Exception { + final ActorContext actorContext = mock(ActorContext.class); - normalizedNodeOptional = read.get(); + when(actorContext.executeShardOperation(anyString(), any(), any( + FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason"))); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + + ListenableFuture>> read = + transactionProxy.read(TestModel.TEST_PATH); + + Assert.assertFalse(read.get().isPresent()); - Assert.assertFalse(normalizedNodeOptional.isPresent()); } + @Test + public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception { + final ActorContext actorContext = mock(ActorContext.class); + + when(actorContext.executeShardOperation(anyString(), any(), any( + FiniteDuration.class))).thenThrow(new NullPointerException()); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + + + try { + ListenableFuture>> read = + transactionProxy.read(TestModel.TEST_PATH); + fail("A null pointer exception was expected"); + } catch(NullPointerException e){ + + } + } + + + @Test public void testWrite() throws Exception { final Props props = Props.create(MessageCollectorActor.class); final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); @@ -129,12 +259,17 @@ public class TransactionProxyTest extends AbstractActorTest { Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); } + private Object createPrimaryFound(ActorRef actorRef) { + return new PrimaryFound(actorRef.path().toString()).toSerializable(); + } + @Test public void testMerge() throws Exception { final Props props = Props.create(MessageCollectorActor.class); final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); @@ -166,6 +301,7 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); @@ -196,14 +332,17 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef doNothingActorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef)); actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); - actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path())); + actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable()); TransactionProxy transactionProxy = new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + transactionProxy.read(TestModel.TEST_PATH); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); @@ -235,6 +374,7 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); @@ -242,6 +382,8 @@ public class TransactionProxyTest extends AbstractActorTest { new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + transactionProxy.read(TestModel.TEST_PATH); + transactionProxy.close(); Object messages = testContext @@ -256,7 +398,7 @@ public class TransactionProxyTest extends AbstractActorTest { Assert.assertEquals(1, listMessages.size()); - Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction); + Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)); } private CreateTransactionReply createTransactionReply(ActorRef actorRef){