X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FConcurrentDOMDataBrokerTest.java;h=7f481019ae853591b5d2d20434ec0ad0f91baa77;hp=0b166f5ac83d11a225d1cd3fb026932f867f42fc;hb=d594cf3be29ab746695eb5a3b0d220be89b57566;hpb=ba29dacfce1ac56bbd26574520b67bca545d6c5d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java index 0b166f5ac8..7f481019ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java @@ -8,12 +8,17 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; @@ -22,8 +27,12 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,12 +44,24 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; /** * Unit tests for DOMConcurrentDataCommitCoordinator. @@ -61,7 +82,7 @@ public class ConcurrentDOMDataBrokerTest { doReturn("tx").when(transaction).getIdentifier(); DOMStore store = new InMemoryDOMDataStore("OPER", - MoreExecutors.sameThreadExecutor()); + MoreExecutors.newDirectExecutorService()); coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store), futureExecutor); } @@ -167,7 +188,7 @@ public class ConcurrentDOMDataBrokerTest { assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3); } - private void assertFailure(final CheckedFuture future, + private static void assertFailure(final CheckedFuture future, final Exception expCause, final DOMStoreThreePhaseCommitCohort... mockCohorts) throws Exception { try { @@ -175,7 +196,7 @@ public class ConcurrentDOMDataBrokerTest { fail("Expected TransactionCommitFailedException"); } catch (TransactionCommitFailedException e) { if(expCause != null) { - assertSame("Expected cause", expCause, e.getCause()); + assertSame("Expected cause", expCause.getClass(), e.getCause().getClass()); } InOrder inOrder = inOrder((Object[])mockCohorts); @@ -202,6 +223,21 @@ public class ConcurrentDOMDataBrokerTest { assertFailure(future, cause, mockCohort1, mockCohort2); } + @Test + public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception { + doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort(); + NoShardLeaderException rootCause = new NoShardLeaderException("mock"); + DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause); + doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2)); + + assertFailure(future, cause, mockCohort1, mockCohort2); + } + @Test public void testSubmitWithPreCommitException() throws Exception { doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); @@ -266,4 +302,265 @@ public class ConcurrentDOMDataBrokerTest { assertFailure(future, cause, mockCohort1, mockCohort2); } + + @Test + public void testCreateReadWriteTransaction(){ + DOMStore domStore = mock(DOMStore.class); + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore), + futureExecutor)) { + dataBroker.newReadWriteTransaction(); + + verify(domStore, never()).newReadWriteTransaction(); + } + } + + @Test + public void testCreateWriteOnlyTransaction(){ + DOMStore domStore = mock(DOMStore.class); + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore), + futureExecutor)) { + dataBroker.newWriteOnlyTransaction(); + + verify(domStore, never()).newWriteOnlyTransaction(); + } + } + + @Test + public void testCreateReadOnlyTransaction(){ + DOMStore domStore = mock(DOMStore.class); + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore), + futureExecutor)) { + dataBroker.newReadOnlyTransaction(); + + verify(domStore, never()).newReadOnlyTransaction(); + } + } + + @Test + public void testLazySubTransactionCreationForReadWriteTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction(); + doReturn(storeTxn).when(configDomStore).newReadWriteTransaction(); + + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION, + configDomStore), futureExecutor)) { + DOMDataReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction(); + + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, never()).newReadWriteTransaction(); + verify(operationalDomStore, times(1)).newReadWriteTransaction(); + + dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, times(1)).newReadWriteTransaction(); + verify(operationalDomStore, times(1)).newReadWriteTransaction(); + } + + } + + @Test + public void testLazySubTransactionCreationForWriteOnlyTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction(); + doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction(); + + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION, + configDomStore), futureExecutor)) { + DOMDataWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction(); + + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, never()).newWriteOnlyTransaction(); + verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); + + dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + verify(configDomStore, times(1)).newWriteOnlyTransaction(); + verify(operationalDomStore, times(1)).newWriteOnlyTransaction(); + } + } + + @Test + public void testLazySubTransactionCreationForReadOnlyTransactions(){ + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class); + + doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction(); + doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction(); + + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION, + configDomStore), futureExecutor)) { + DOMDataReadOnlyTransaction dataTxn = dataBroker.newReadOnlyTransaction(); + + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, never()).newReadOnlyTransaction(); + verify(operationalDomStore, times(1)).newReadOnlyTransaction(); + + dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build()); + + verify(configDomStore, times(1)).newReadOnlyTransaction(); + verify(operationalDomStore, times(1)).newReadOnlyTransaction(); + } + } + + @Test + public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException { + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class); + + doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction(); + doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort).abort(); + + final CountDownLatch latch = new CountDownLatch(1); + final List commitCohorts = new ArrayList<>(); + + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION, + configDomStore), futureExecutor) { + @Override + public CheckedFuture submit(DOMDataWriteTransaction transaction, Collection cohorts) { + commitCohorts.addAll(cohorts); + latch.countDown(); + return super.submit(transaction, cohorts); + } + }) { + DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); + + domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build()); + + domDataReadWriteTransaction.submit(); + + latch.await(10, TimeUnit.SECONDS); + + assertTrue(commitCohorts.size() == 1); + } + } + + @Test + public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException { + DOMStore configDomStore = mock(DOMStore.class); + DOMStore operationalDomStore = mock(DOMStore.class); + DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class); + DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class); + + doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction(); + doReturn(configTransaction).when(configDomStore).newReadWriteTransaction(); + + doReturn(mockCohortOperational).when(operationalTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort(); + + doReturn(mockCohortConfig).when(configTransaction).ready(); + doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort(); + + final CountDownLatch latch = new CountDownLatch(1); + final List commitCohorts = new ArrayList<>(); + + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION, + configDomStore), futureExecutor) { + @Override + public CheckedFuture submit(DOMDataWriteTransaction transaction, Collection cohorts) { + commitCohorts.addAll(cohorts); + latch.countDown(); + return super.submit(transaction, cohorts); + } + }) { + DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction(); + + domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + + domDataReadWriteTransaction.submit(); + + latch.await(10, TimeUnit.SECONDS); + + assertTrue(commitCohorts.size() == 2); + } + } + + @Test + public void testCreateTransactionChain(){ + DOMStore domStore = mock(DOMStore.class); + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore), + futureExecutor)) { + + dataBroker.createTransactionChain(mock(TransactionChainListener.class)); + + verify(domStore, times(2)).createTransactionChain(); + } + + } + + @Test + public void testCreateTransactionOnChain(){ + DOMStore domStore = mock(DOMStore.class); + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore), + futureExecutor)) { + + DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class); + DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class); + + doReturn(mockChain).when(domStore).createTransactionChain(); + doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction(); + + DOMTransactionChain transactionChain = dataBroker.createTransactionChain(mock(TransactionChainListener.class)); + + DOMDataWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction(); + + verify(mockChain, never()).newWriteOnlyTransaction(); + + domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class)); + } + } + + @Test + public void testEmptyTransactionSubmitSucceeds() throws ExecutionException, InterruptedException { + DOMStore domStore = mock(DOMStore.class); + try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore), + futureExecutor)) { + + CheckedFuture submit1 = dataBroker.newWriteOnlyTransaction().submit(); + + assertNotNull(submit1); + + submit1.get(); + + CheckedFuture submit2 = dataBroker.newReadWriteTransaction().submit(); + + assertNotNull(submit2); + + submit2.get(); + } + } + }