Separate out RaftEntryMeta
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
index 3d00a2592b0a842b194f8e43a27a83b5d9de4bc3..9f19ca045d6144639ecdcfb19da7301013036775 100644 (file)
@@ -16,7 +16,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
@@ -24,6 +24,7 @@ import akka.actor.ActorSystem;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -40,14 +41,13 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runners.Parameterized.Parameter;
-import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
@@ -63,14 +63,12 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -86,9 +84,8 @@ import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public abstract class AbstractDistributedDataStoreIntegrationTest {
-
     @Parameter
-    public Class<? extends AbstractDataStore> testParameter;
+    public Class<? extends ClientBackedDataStore> testParameter;
 
     protected ActorSystem system;
 
@@ -102,8 +99,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testWriteTransactionWithSingleShard() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "transactionIntegrationTest", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "transactionIntegrationTest", "test-1")) {
 
             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -118,8 +114,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testWriteTransactionWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testWriteTransactionWithMultipleShards",
+            "cars-1", "people-1")) {
 
             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -159,8 +155,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithSingleShard",
+            "test-1")) {
 
             // 1. Create a read-write Tx
             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
@@ -193,8 +189,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testReadWriteTransactionWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithMultipleShards",
+            "cars-1", "people-1")) {
 
             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
@@ -238,13 +234,13 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
     @Test
     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
-        final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
+        final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+        try (var dataStore = testKit.setupDataStore(testParameter, "testSingleTransactionsWritesInQuickSuccession",
+            "cars-1")) {
 
-            final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+            final var txChain = dataStore.createTransactionChain();
 
-            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            var writeTx = txChain.newWriteOnlyTransaction();
             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
             testKit.doCommit(writeTx.ready());
@@ -267,16 +263,16 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 .untilAsserted(() -> {
                     // verify frontend metadata has no holes in purged transactions causing overtime memory leak
                     final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
-                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
-                        (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+                    final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
                             .executeOperation(localShard, new RequestFrontendMetadata());
 
                     final var clientMeta = frontendMetadata.getClients().get(0);
-                    if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellMetadata(clientMeta);
-                    } else {
-                        assertAskMetadata(clientMeta);
+                    final var iterator = clientMeta.getCurrentHistories().iterator();
+                    var metadata = iterator.next();
+                    while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                        metadata = iterator.next();
                     }
+                    assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
                 });
 
             final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
@@ -288,20 +284,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         }
     }
 
-    private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
-        // ask based should track no metadata
-        assertEquals(List.of(), clientMeta.getCurrentHistories());
-    }
-
-    private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
-        final var iterator = clientMeta.getCurrentHistories().iterator();
-        var metadata = iterator.next();
-        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
-            metadata = iterator.next();
-        }
-        assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
-    }
-
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
             throws Exception {
@@ -318,8 +300,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
 
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
-
+        try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
             final Object result = dataStore.getActorUtils().executeOperation(
                 dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
@@ -395,8 +376,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testTransactionAbort() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "transactionAbortIntegrationTest", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "transactionAbortIntegrationTest", "test-1")) {
 
             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -418,8 +398,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @SuppressWarnings("checkstyle:IllegalCatch")
     public void testTransactionChainWithSingleShard() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testTransactionChainWithSingleShard", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithSingleShard", "test-1")) {
 
             // 1. Create a Tx chain and write-only Tx
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -495,8 +474,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testTransactionChainWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithMultipleShards",
+            "cars-1", "people-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -551,16 +530,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionsInQuickSuccession",
+            "cars-1")) {
 
             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
                 MoreExecutors.directExecutor());
 
-            final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
-            DOMTransactionChain txChain = broker.createTransactionChain(listener);
+            DOMTransactionChain txChain = broker.createTransactionChain();
 
             final List<ListenableFuture<?>> futures = new ArrayList<>();
 
@@ -597,8 +575,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterEmptyTxReadied",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -618,8 +596,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionWhenPreviousNotReady",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -638,8 +616,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testCreateChainedTransactionAfterClose() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterClose",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
             txChain.close();
@@ -653,8 +631,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testChainWithReadOnlyTxAfterPreviousReady",
+            "test-1")) {
 
             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
@@ -696,16 +674,17 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
         final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (var dataStore = testKit.setupAbstractDataStore(testParameter,
-                "testChainedTransactionFailureWithSingleShard", "cars-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithSingleShard",
+            "cars-1")) {
 
             final var broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
                 MoreExecutors.directExecutor());
 
-            final var listener = Mockito.mock(DOMTransactionChainListener.class);
-            final var txChain = broker.createTransactionChain(listener);
+            final var listener = mock(FutureCallback.class);
+            final var txChain = broker.createTransactionChain();
+            txChain.addCallback(listener);
 
             final var writeTx = txChain.newReadWriteTransaction();
 
@@ -721,8 +700,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
 
-            verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
-                any(Throwable.class));
+            verify(listener, timeout(5000)).onFailure(any());
 
             txChain.close();
             broker.close();
@@ -732,16 +710,17 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithMultipleShards",
+            "cars-1", "people-1")) {
 
             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
                 MoreExecutors.directExecutor());
 
-            final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
-            final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+            final var listener = mock(FutureCallback.class);
+            final DOMTransactionChain txChain = broker.createTransactionChain();
+            txChain.addCallback(listener);
 
             final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
 
@@ -760,8 +739,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             // done for put for performance reasons.
             assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
 
-            verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
-                any(Throwable.class));
+            verify(listener, timeout(5000)).onFailure(any());
 
             txChain.close();
             broker.close();
@@ -771,16 +749,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     @Test
     public void testDataTreeChangeListenerRegistration() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testDataTreeChangeListenerRegistration",
+            "test-1")) {
 
             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
 
-            ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
-                    .registerTreeChangeListener(TestModel.TEST_PATH, listener);
+            final var listenerReg = dataStore.registerTreeChangeListener(TestModel.TEST_PATH, listener);
 
             assertNotNull("registerTreeChangeListener returned null", listenerReg);
 
@@ -854,8 +831,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
             new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
 
-        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
-            testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, name, "module-shards-member1.conf", true,
+            "cars", "people")) {
 
             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
@@ -870,9 +847,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     // FIXME: re-enable this test
     public void testSnapshotOnRootOverwrite() throws Exception {
         final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder.snapshotOnRootOverwrite(true));
-        try (var dataStore = testKit.setupAbstractDataStore(
-                testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
-                true, "cars", "default")) {
+        try (var dataStore = testKit.setupDataStore(testParameter, "testRootOverwrite",
+            "module-shards-default-cars-member1.conf", true, "cars", "default")) {
 
             final var rootNode = Builders.containerBuilder()
                 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))