+ @Test
+ public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
+
+ followerTestKit.doCommit(writeTx.ready());
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ // wait to let the shard catch up with purged
+ await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+ metadata.getPurgedTransactions().asRanges().iterator().next());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+
+ @Test
+ @Ignore("Flushes out tell based leak needs to be handled separately")
+ public void testCloseTransactionMetadataLeak() throws Exception {
+ // Ask based frontend seems to have some issues with back to back close
+ Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.close();
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ // wait to let the shard catch up with purged
+ await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(1, ranges.size());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+