+ /**
+ * Create a new history proxy for a given shard.
+ *
+ * @param shard Shard cookie
+ * @throws InversibleLockException if the shard is being reconnected
+ */
+ @Holding("lock")
+ private ProxyHistory createHistoryProxy(final Long shard) {
+ final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
+ final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
+ identifier.getHistoryId(), shard);
+ LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
+
+ final ProxyHistory ret = createHistoryProxy(proxyId, connection);
+
+ // Request creation of the history, if it is not the single history
+ if (ret.getIdentifier().getHistoryId() != 0) {
+ connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
+ this::createHistoryCallback);
+ }
+ return ret;
+ }
+
+ abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
+ AbstractClientConnection<ShardBackendInfo> connection);
+
+ private void createHistoryCallback(final Response<?, ?> response) {
+ LOG.debug("Create history response {}", response);
+ }
+
+ private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
+ while (true) {
+ try {
+ // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
+ // see comments in startReconnect() for details.
+ final long stamp = lock.readLock();
+ try {
+ return histories.computeIfAbsent(shard, this::createHistoryProxy);
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ } catch (InversibleLockException e) {
+ LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
+ e.awaitResolution();
+ LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
+ }
+ }