+ final var it = ids.iterator();
+ while (it.hasNext()) {
+ final var id = it.next();
+ final long bits = id.longValue();
+ if (purgedTransactions.contains(bits)) {
+ LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id);
+ it.remove();
+ } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) {
+ LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id);
+ it.remove();
+ }
+ }
+
+ if (ids.isEmpty()) {
+ LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier());
+ return new SkipTransactionsResponse(first, now);
+ }
+
+ final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray())
+ .immutableCopy();
+ LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges());
+
+ tree.skipTransactions(getIdentifier(), transactionIds, () -> {
+ purgedTransactions.addAll(transactionIds);
+ envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now);
+ });
+ return null;
+ }
+
+ final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+ LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+ tree.closeTransactionChain(getIdentifier(),
+ () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
+ }
+
+ final void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+ LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
+ tree.purgeTransactionChain(getIdentifier(),
+ () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
+ }
+
+ final void retire() {
+ transactions.values().forEach(FrontendTransaction::retire);
+ tree.removeTransactionChain(getIdentifier());
+ }
+
+ private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id) {
+ if (request instanceof CommitLocalTransactionRequest) {
+ LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id);
+ tree.getStats().incrementReadWriteTransactionCount();
+ return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
+ }
+ if (request instanceof AbstractReadTransactionRequest
+ && ((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
+ LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id);
+ tree.getStats().incrementReadOnlyTransactionCount();
+ return createOpenSnapshot(id);
+ }
+
+ LOG.debug("{}: allocating new open transaction {}", persistenceId(), id);
+ tree.getStats().incrementReadWriteTransactionCount();
+ return createOpenTransaction(id);
+ }
+
+ abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id);
+
+ abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id);
+
+ abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod);
+
+ abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
+ Exception failure);
+
+ abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod,
+ Optional<SortedSet<String>> participatingShardNames);
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).omitNullValues()
+ .add("identifier", getIdentifier())
+ .add("persistenceId", persistenceId)
+ .add("transactions", transactions)
+ .toString();
+ }