Revert "Leader should always apply modifications as local"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
index 62986b2ebfe86f57a378d973e1be36e01eb27c29..a6b4d912cf565d5497f1c4f60fab711faecbd69c 100644 (file)
@@ -12,38 +12,46 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.runners.Parameterized.Parameter;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
@@ -65,6 +73,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
@@ -129,7 +138,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             writeTx = dataStore.newWriteOnlyTransaction();
 
-            final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
             writeTx.write(carPath, car);
 
@@ -213,7 +222,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             readWriteTx = dataStore.newReadWriteTransaction();
 
-            final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
             readWriteTx.write(carPath, car);
 
@@ -244,6 +253,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     }
 
     @Test
+    @Ignore("Flushes a closed tx leak in single node, needs to be handled separately")
     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
@@ -256,15 +266,40 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
             testKit.doCommit(writeTx.ready());
 
-            writeTx = txChain.newWriteOnlyTransaction();
-
             int numCars = 5;
             for (int i = 0; i < numCars; i++) {
+                writeTx = txChain.newWriteOnlyTransaction();
                 writeTx.write(CarsModel.newCarPath("car" + i),
-                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+                    CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
+
+                testKit.doCommit(writeTx.ready());
+
+                DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+                domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+                domStoreReadTransaction.close();
             }
 
-            testKit.doCommit(writeTx.ready());
+            // verify frontend metadata has no holes in purged transactions causing overtime memory leak
+            Optional<ActorRef> localShard = dataStore.getActorUtils().findLocalShard("cars-1");
+            FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                    (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+                            .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+            if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                Iterator<FrontendHistoryMetadata> iterator =
+                        frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                FrontendHistoryMetadata metadata = iterator.next();
+                while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                    metadata = iterator.next();
+                }
+                Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+                assertEquals(1, ranges.size());
+            } else {
+                // ask based should track no metadata
+                assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+            }
 
             final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
                     .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
@@ -341,7 +376,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 } catch (final ExecutionException e) {
                     final String msg = "Unexpected exception: "
                             + Throwables.getStackTraceAsString(e.getCause());
-                    if (DistributedDataStore.class.equals(testParameter)) {
+                    if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                         assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
                     } else {
                         assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
@@ -498,7 +533,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
 
             final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
 
-            final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
             readWriteTx.write(carPath, car);
 
@@ -568,7 +603,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
 
                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
-                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+                    CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
 
                 futures.add(rwTx.commit());
             }
@@ -828,13 +863,13 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         final String name = "transactionIntegrationTest";
 
         final ContainerNode carsNode = CarsModel.newCarsNode(
-            CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
-                CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+            CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)),
+                CarsModel.newCarEntry("sportage", Uint64.valueOf(30000))));
 
         DataTree dataTree = new InMemoryDataTreeFactory().create(
             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
         AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
-        NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+        NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
 
         final Snapshot carsSnapshot = Snapshot.create(
             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
@@ -846,7 +881,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
         final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
         AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
 
-        root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+        root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
 
         final Snapshot peopleSnapshot = Snapshot.create(
             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),