+ private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
+ final CommitEntry current = pendingFinishCommits.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ allMetadataCommittedTransaction(txId);
+ return false;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ allMetadataCommittedTransaction(txId);
+ return false;
+ }
+
+ finishCommit(current.cohort);
+ return true;
+ }
+
+ private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionAborted(txId);
+ }
+ }
+
+ private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionCommitted(txId);
+ }
+ }
+
+ private void allMetadataPurgedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionPurged(txId);
+ }
+ }
+
+ private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryCreated(historyId);
+ }
+ }
+
+ private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryClosed(historyId);
+ }
+ }
+
+ private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryPurged(historyId);
+ }
+ }
+
+ private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+ final var historyId = payload.getIdentifier();
+ final var txIds = payload.getTransactionIds();
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionsSkipped(historyId, txIds);
+ }
+ }
+
+ /**
+ * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
+ * this method is used for re-establishing state when we are taking over
+ *
+ * @param historyId Local history identifier
+ * @param closed True if the chain should be created in closed state (i.e. pending purge)
+ * @return Transaction chain handle
+ */
+ final ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+ final boolean closed) {
+ final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
+ final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
+ checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, existing);
+ return ret;
+ }
+
+ final ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+ final @Nullable Runnable callback) {
+ ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
+ if (chain == null) {
+ chain = new ShardDataTreeTransactionChain(historyId, this);
+ transactionChains.put(historyId, chain);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(
+ historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ } else if (callback != null) {
+ callback.run();
+ }
+
+ return chain;
+ }
+
+ final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ shard.getShardMBean().incrementReadOnlyTransactionCount();
+
+ if (txId.getHistoryId().getHistoryId() == 0) {
+ return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
+ }
+
+ return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
+ }
+
+ final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ shard.getShardMBean().incrementReadWriteTransactionCount();
+
+ if (txId.getHistoryId().getHistoryId() == 0) {
+ return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
+ .newModification());
+ }
+
+ return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
+ }
+
+ @VisibleForTesting
+ final void notifyListeners(final DataTreeCandidate candidate) {
+ treeChangeListenerPublisher.publishChanges(candidate);
+ }
+
+ /**
+ * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
+ * replication callbacks.
+ */
+ final void purgeLeaderState() {