Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
index 0b97772e94580127874e8989cb898e13aec5aadb..2c70977bfc183a8da60f55b5aa8c220e6bb573a6 100644 (file)
@@ -21,22 +21,21 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
-import akka.testkit.JavaTestKit;
-import com.google.common.base.Optional;
+import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
-import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -59,7 +58,6 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -68,21 +66,19 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
+import org.opendaylight.mdsal.common.api.TransactionChainListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -114,7 +110,7 @@ public class DistributedDataStoreIntegrationTest {
             .shardHeartbeatIntervalInMillis(100);
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
         InMemorySnapshotStore.clear();
         InMemoryJournal.clear();
         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
@@ -123,8 +119,8 @@ public class DistributedDataStoreIntegrationTest {
     }
 
     @After
-    public void tearDown() throws IOException {
-        JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
+    public void tearDown() {
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
         system = null;
     }
 
@@ -215,7 +211,7 @@ public class DistributedDataStoreIntegrationTest {
                     readWriteTx.write(nodePath, nodeToWrite);
 
                     // 3. Read the data from Tx
-                    final Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
+                    final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
                     assertEquals("exists", true, exists);
 
                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
@@ -271,7 +267,7 @@ public class DistributedDataStoreIntegrationTest {
                     final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
                     readWriteTx.write(personPath, person);
 
-                    final Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
+                    final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
                     assertEquals("exists", true, exists);
 
                     Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
@@ -448,9 +444,8 @@ public class DistributedDataStoreIntegrationTest {
                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
 
                     // Do some reads on the Tx on a separate thread.
-                    final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
-                            new AtomicReference<>();
-                    final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
+                    final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
+                    final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>>
                             txReadFuture = new AtomicReference<>();
                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
                     final CountDownLatch txReadsDone = new CountDownLatch(1);
@@ -485,8 +480,8 @@ public class DistributedDataStoreIntegrationTest {
                     blockRecoveryLatch.countDown();
 
                     // Wait for the reads to complete and verify.
-                    assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
-                    assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
+                    assertEquals("exists", true, txExistsFuture.get().get(5, TimeUnit.SECONDS));
+                    assertEquals("read", true, txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
 
                     readWriteTx.close();
                 }
@@ -591,7 +586,7 @@ public class DistributedDataStoreIntegrationTest {
                     assertNotNull("newReadWriteTransaction returned null", readWriteTx);
 
                     // Do a read on the Tx on a separate thread.
-                    final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
+                    final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>>
                             txReadFuture = new AtomicReference<>();
                     final AtomicReference<Exception> caughtEx = new AtomicReference<>();
                     final CountDownLatch txReadDone = new CountDownLatch(1);
@@ -624,9 +619,10 @@ public class DistributedDataStoreIntegrationTest {
                     // initialized, the Tx should
                     // have timed out and throw an appropriate exception cause.
                     try {
-                        txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
-                        fail("Expected NotInitializedException");
-                    } catch (final ReadFailedException e) {
+                        txReadFuture.get().get(5, TimeUnit.SECONDS);
+                    } catch (ExecutionException e) {
+                        assertTrue("Expected ReadFailedException cause: " + e.getCause(),
+                                e.getCause() instanceof ReadFailedException);
                         final Throwable root = Throwables.getRootCause(e);
                         Throwables.throwIfUnchecked(root);
                         throw new RuntimeException(root);
@@ -933,25 +929,25 @@ public class DistributedDataStoreIntegrationTest {
                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
                     DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
-                    final List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
+                    final List<ListenableFuture<?>> futures = new ArrayList<>();
 
-                    final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+                    final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
                     writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-                    futures.add(writeTx.submit());
+                    futures.add(writeTx.commit());
 
                     int numCars = 100;
                     for (int i = 0; i < numCars; i++) {
-                        final DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+                        final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
 
                         rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
                                 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
 
-                        futures.add(rwTx.submit());
+                        futures.add(rwTx.commit());
                     }
 
-                    for (final CheckedFuture<Void, TransactionCommitFailedException> f : futures) {
-                        f.checkedGet();
+                    for (final ListenableFuture<?> f : futures) {
+                        f.get(5, TimeUnit.SECONDS);
                     }
 
                     final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
@@ -1048,10 +1044,10 @@ public class DistributedDataStoreIntegrationTest {
                     final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
                     // Create read-only tx's and issue a read.
-                    CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 = txChain
+                    FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
 
-                    CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 = txChain
+                    FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
                             .newReadOnlyTransaction().read(TestModel.TEST_PATH);
 
                     // Create another write tx and issue the write.
@@ -1061,8 +1057,8 @@ public class DistributedDataStoreIntegrationTest {
 
                     // Ensure the reads succeed.
 
-                    assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
-                    assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
+                    assertEquals("isPresent", true, readFuture1.get(5, TimeUnit.SECONDS).isPresent());
+                    assertEquals("isPresent", true, readFuture2.get(5, TimeUnit.SECONDS).isPresent());
 
                     // Ensure the writes succeed.
                     DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
@@ -1071,7 +1067,7 @@ public class DistributedDataStoreIntegrationTest {
                     doCommit(cohort2);
 
                     assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
-                            .checkedGet(5, TimeUnit.SECONDS).isPresent());
+                            .get(5, TimeUnit.SECONDS).isPresent());
                 }
             }
         };
@@ -1092,7 +1088,7 @@ public class DistributedDataStoreIntegrationTest {
                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
-                    final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
+                    final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
 
                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
                             PeopleModel.emptyContainer());
@@ -1104,9 +1100,9 @@ public class DistributedDataStoreIntegrationTest {
                     writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
                     try {
-                        writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+                        writeTx.commit().get(5, TimeUnit.SECONDS);
                         fail("Expected TransactionCommitFailedException");
-                    } catch (final TransactionCommitFailedException e) {
+                    } catch (final ExecutionException e) {
                         // Expected
                     }
 
@@ -1135,7 +1131,7 @@ public class DistributedDataStoreIntegrationTest {
                     final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
                     final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
-                    final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
+                    final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
 
                     writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
                             PeopleModel.emptyContainer());
@@ -1150,9 +1146,9 @@ public class DistributedDataStoreIntegrationTest {
                     // succeeds b/c deep validation is not
                     // done for put for performance reasons.
                     try {
-                        writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+                        writeTx.commit().get(5, TimeUnit.SECONDS);
                         fail("Expected TransactionCommitFailedException");
-                    } catch (final TransactionCommitFailedException e) {
+                    } catch (final ExecutionException e) {
                         // Expected
                     }
 
@@ -1166,59 +1162,6 @@ public class DistributedDataStoreIntegrationTest {
         };
     }
 
-    @Test
-    public void testChangeListenerRegistration() throws Exception {
-        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
-            {
-                try (AbstractDataStore dataStore = setupAbstractDataStore(
-                        testParameter, "testChangeListenerRegistration", "test-1")) {
-
-                    testWriteTransaction(dataStore, TestModel.TEST_PATH,
-                            ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-                    final MockDataChangeListener listener = new MockDataChangeListener(1);
-
-                    final ListenerRegistration<MockDataChangeListener> listenerReg = dataStore
-                            .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE);
-
-                    assertNotNull("registerChangeListener returned null", listenerReg);
-
-                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
-                        state -> assertEquals("getDataChangeListenerActors", 1,
-                                state.getDataChangeListenerActors().size()));
-
-                    // Wait for the initial notification
-                    listener.waitForChangeEvents(TestModel.TEST_PATH);
-                    listener.reset(2);
-
-                    // Write 2 updates.
-                    testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
-                            ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
-
-                    YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                            .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
-                    testWriteTransaction(dataStore, listPath,
-                            ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
-
-                    // Wait for the 2 updates.
-                    listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
-                    listenerReg.close();
-
-                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
-                        state -> assertEquals("getDataChangeListenerActors", 0,
-                                state.getDataChangeListenerActors().size()));
-
-                    testWriteTransaction(dataStore,
-                            YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
-                            ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
-
-                    listener.expectNoMoreChanges("Received unexpected change after close");
-                }
-            }
-        };
-    }
-
     @Test
     public void testDataTreeChangeListenerRegistration() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {