import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.runners.Parameterized.Parameter;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
}
@Test
+ @Ignore("Flushes a closed tx leak in single node, needs to be handled separately")
public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
testKit.doCommit(writeTx.ready());
- writeTx = txChain.newWriteOnlyTransaction();
-
int numCars = 5;
for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
writeTx.write(CarsModel.newCarPath("car" + i),
CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+ testKit.doCommit(writeTx.ready());
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
}
- testKit.doCommit(writeTx.ready());
+ // verify frontend metadata has no holes in purged transactions causing overtime memory leak
+ Optional<ActorRef> localShard = dataStore.getActorUtils().findLocalShard("cars-1");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+ Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+ assertEquals(1, ranges.size());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: "
+ Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
} else {
assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);