X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=2c70977bfc183a8da60f55b5aa8c220e6bb573a6;hb=a6af137c30470b86d4bc624d4c48cb686495a182;hp=0b97772e94580127874e8989cb898e13aec5aadb;hpb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 0b97772e94..2c70977bfc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -21,22 +21,21 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; -import akka.testkit.JavaTestKit; -import com.google.common.base.Optional; +import akka.testkit.javadsl.TestKit; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -59,7 +58,6 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; -import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -68,21 +66,19 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.mdsal.common.api.TransactionChainClosedException; +import org.opendaylight.mdsal.common.api.TransactionChainListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStore; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -114,7 +110,7 @@ public class DistributedDataStoreIntegrationTest { .shardHeartbeatIntervalInMillis(100); @Before - public void setUp() throws IOException { + public void setUp() { InMemorySnapshotStore.clear(); InMemoryJournal.clear(); system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); @@ -123,8 +119,8 @@ public class DistributedDataStoreIntegrationTest { } @After - public void tearDown() throws IOException { - JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE); + public void tearDown() { + TestKit.shutdownActorSystem(system, Boolean.TRUE); system = null; } @@ -215,7 +211,7 @@ public class DistributedDataStoreIntegrationTest { readWriteTx.write(nodePath, nodeToWrite); // 3. Read the data from Tx - final Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); + final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS); assertEquals("exists", true, exists); Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); @@ -271,7 +267,7 @@ public class DistributedDataStoreIntegrationTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.write(personPath, person); - final Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); + final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS); assertEquals("exists", true, exists); Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); @@ -448,9 +444,8 @@ public class DistributedDataStoreIntegrationTest { assertNotNull("newReadWriteTransaction returned null", readWriteTx); // Do some reads on the Tx on a separate thread. - final AtomicReference> txExistsFuture = - new AtomicReference<>(); - final AtomicReference>, ReadFailedException>> + final AtomicReference> txExistsFuture = new AtomicReference<>(); + final AtomicReference>>> txReadFuture = new AtomicReference<>(); final AtomicReference caughtEx = new AtomicReference<>(); final CountDownLatch txReadsDone = new CountDownLatch(1); @@ -485,8 +480,8 @@ public class DistributedDataStoreIntegrationTest { blockRecoveryLatch.countDown(); // Wait for the reads to complete and verify. - assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); - assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); + assertEquals("exists", true, txExistsFuture.get().get(5, TimeUnit.SECONDS)); + assertEquals("read", true, txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent()); readWriteTx.close(); } @@ -591,7 +586,7 @@ public class DistributedDataStoreIntegrationTest { assertNotNull("newReadWriteTransaction returned null", readWriteTx); // Do a read on the Tx on a separate thread. - final AtomicReference>, ReadFailedException>> + final AtomicReference>>> txReadFuture = new AtomicReference<>(); final AtomicReference caughtEx = new AtomicReference<>(); final CountDownLatch txReadDone = new CountDownLatch(1); @@ -624,9 +619,10 @@ public class DistributedDataStoreIntegrationTest { // initialized, the Tx should // have timed out and throw an appropriate exception cause. try { - txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); - fail("Expected NotInitializedException"); - } catch (final ReadFailedException e) { + txReadFuture.get().get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertTrue("Expected ReadFailedException cause: " + e.getCause(), + e.getCause() instanceof ReadFailedException); final Throwable root = Throwables.getRootCause(e); Throwables.throwIfUnchecked(root); throw new RuntimeException(root); @@ -933,25 +929,25 @@ public class DistributedDataStoreIntegrationTest { final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); DOMTransactionChain txChain = broker.createTransactionChain(listener); - final List> futures = new ArrayList<>(); + final List> futures = new ArrayList<>(); - final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - futures.add(writeTx.submit()); + futures.add(writeTx.commit()); int numCars = 100; for (int i = 0; i < numCars; i++) { - final DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - futures.add(rwTx.submit()); + futures.add(rwTx.commit()); } - for (final CheckedFuture f : futures) { - f.checkedGet(); + for (final ListenableFuture f : futures) { + f.get(5, TimeUnit.SECONDS); } final Optional> optional = txChain.newReadOnlyTransaction() @@ -1048,10 +1044,10 @@ public class DistributedDataStoreIntegrationTest { final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); // Create read-only tx's and issue a read. - CheckedFuture>, ReadFailedException> readFuture1 = txChain + FluentFuture>> readFuture1 = txChain .newReadOnlyTransaction().read(TestModel.TEST_PATH); - CheckedFuture>, ReadFailedException> readFuture2 = txChain + FluentFuture>> readFuture2 = txChain .newReadOnlyTransaction().read(TestModel.TEST_PATH); // Create another write tx and issue the write. @@ -1061,8 +1057,8 @@ public class DistributedDataStoreIntegrationTest { // Ensure the reads succeed. - assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent()); - assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture1.get(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture2.get(5, TimeUnit.SECONDS).isPresent()); // Ensure the writes succeed. DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); @@ -1071,7 +1067,7 @@ public class DistributedDataStoreIntegrationTest { doCommit(cohort2); assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH) - .checkedGet(5, TimeUnit.SECONDS).isPresent()); + .get(5, TimeUnit.SECONDS).isPresent()); } } }; @@ -1092,7 +1088,7 @@ public class DistributedDataStoreIntegrationTest { final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); @@ -1104,9 +1100,9 @@ public class DistributedDataStoreIntegrationTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + writeTx.commit().get(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (final TransactionCommitFailedException e) { + } catch (final ExecutionException e) { // Expected } @@ -1135,7 +1131,7 @@ public class DistributedDataStoreIntegrationTest { final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); + final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); @@ -1150,9 +1146,9 @@ public class DistributedDataStoreIntegrationTest { // succeeds b/c deep validation is not // done for put for performance reasons. try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + writeTx.commit().get(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (final TransactionCommitFailedException e) { + } catch (final ExecutionException e) { // Expected } @@ -1166,59 +1162,6 @@ public class DistributedDataStoreIntegrationTest { }; } - @Test - public void testChangeListenerRegistration() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testChangeListenerRegistration", "test-1")) { - - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - final MockDataChangeListener listener = new MockDataChangeListener(1); - - final ListenerRegistration listenerReg = dataStore - .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE); - - assertNotNull("registerChangeListener returned null", listenerReg); - - IntegrationTestKit.verifyShardState(dataStore, "test-1", - state -> assertEquals("getDataChangeListenerActors", 1, - state.getDataChangeListenerActors().size())); - - // Wait for the initial notification - listener.waitForChangeEvents(TestModel.TEST_PATH); - listener.reset(2); - - // Write 2 updates. - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - - YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); - testWriteTransaction(dataStore, listPath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - - // Wait for the 2 updates. - listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); - listenerReg.close(); - - IntegrationTestKit.verifyShardState(dataStore, "test-1", - state -> assertEquals("getDataChangeListenerActors", 0, - state.getDataChangeListenerActors().size())); - - testWriteTransaction(dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); - - listener.expectNoMoreChanges("Received unexpected change after close"); - } - } - }; - } - @Test public void testDataTreeChangeListenerRegistration() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {