+ LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
+
+ final ProxyHistory ret = createHistoryProxy(proxyId, connection);
+
+ // Request creation of the history, if it is not the single history
+ if (ret.getIdentifier().getHistoryId() != 0) {
+ connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
+ this::createHistoryCallback);
+ }
+ return ret;
+ }
+
+ abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
+ AbstractClientConnection<ShardBackendInfo> connection);
+
+ private void createHistoryCallback(final Response<?, ?> response) {
+ LOG.debug("Create history response {}", response);
+ }
+
+ private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
+ while (true) {
+ try {
+ // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
+ // see comments in startReconnect() for details.
+ final long stamp = lock.readLock();
+ try {
+ return histories.computeIfAbsent(shard, this::createHistoryProxy);
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ } catch (InversibleLockException e) {
+ LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
+ e.awaitResolution();
+ LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
+ }
+ }
+ }
+
+ final @NonNull AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId,
+ final Long shard) {
+ return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
+ }
+
+ final @NonNull AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId,
+ final Long shard) {
+ return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
+ }
+
+ private void checkNotClosed() {
+ if (state == State.CLOSED) {
+ throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier));
+ }
+ }
+
+ /**
+ * Allocate a new {@link ClientTransaction}.
+ *
+ * @return A new {@link ClientTransaction}
+ * @throws DOMTransactionChainClosedException if this history is closed
+ * @throws IllegalStateException if a previous dependent transaction has not been closed
+ */
+ public @NonNull ClientTransaction createTransaction() {
+ checkNotClosed();
+
+ synchronized (this) {
+ final ClientTransaction ret = doCreateTransaction();
+ openTransactions.put(ret.getIdentifier(), ret);
+ return ret;
+ }