- await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- Optional<ActorRef> localShard =
- leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
- FrontendShardDataTreeSnapshotMetadata frontendMetadata =
- (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
- .executeOperation(localShard.get(), new RequestFrontendMetadata());
-
- if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- Iterator<FrontendHistoryMetadata> iterator =
- frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
- FrontendHistoryMetadata metadata = iterator.next();
- while (iterator.hasNext() && metadata.getHistoryId() != 1) {
- metadata = iterator.next();
- }
-
- Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
-
- assertEquals(0, metadata.getClosedTransactions().size());
- assertEquals(1, ranges.size());
- } else {
- // ask based should track no metadata
- assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
- }
- });
-
- final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
- .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+ final var frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard, new RequestFrontendMetadata());
+
+ assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+ });