+ final void skipTransaction(final TransactionIdentifier txId) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransaction(txId);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local == null) {
+ skippedTransactions = local = new ArrayList<>();
+ }
+ local.add(txId);
+ LOG.debug("Recorded skipped transaction {}", txId);
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Holding("lock")
+ private void skipIfNeeded(final List<TransactionIdentifier> current) {
+ if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
+ skippedTransactions = null;
+ doSkipTransactions(current);
+ }
+ }
+
+ private void skipTransactions(final List<TransactionIdentifier> toSkip) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransactions(toSkip);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local != null) {
+ local.addAll(toSkip);
+ } else {
+ skippedTransactions = local = toSkip;
+ }
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void skipTransactions() {
+ var local = skippedTransactions;
+ if (local != null) {
+ lock.lock();
+ try {
+ local = skippedTransactions;
+ if (local != null && successor == null) {
+ skippedTransactions = null;
+ doSkipTransactions(local);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Holding("lock")
+ private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
+ final var txIds = toSkip.stream()
+ .mapToLong(TransactionIdentifier::getTransactionId)
+ .distinct()
+ .sorted()
+ .mapToObj(UnsignedLong::fromLongBits)
+ .collect(ImmutableList.toImmutableList());
+
+ LOG.debug("Proxy {} skipping transactions {}", this, txIds);
+ connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
+ txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
+ LOG.debug("Proxy {} confirmed transaction skip", this);
+ }, connection.currentTime());
+ }
+