* @author Robert Varga
*/
abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
- static enum State {
+ enum State {
IDLE,
TX_OPEN,
CLOSED,
private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
- private final Map<Long, LocalHistoryIdentifier> histories = new ConcurrentHashMap<>();
+ private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
private final DistributedDataStoreClientBehavior client;
private final LocalHistoryIdentifier identifier;
Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
}
- private LocalHistoryIdentifier getHistoryForCookie(final Long cookie) {
- LocalHistoryIdentifier ret = histories.get(cookie);
- if (ret == null) {
- ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie);
- final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret);
- if (existing != null) {
- ret = existing;
- }
- }
-
- return ret;
- }
-
@Override
public final LocalHistoryIdentifier getIdentifier() {
return identifier;
state = State.CLOSED;
}
+ private AbstractProxyHistory createHistoryProxy(final Long shard) {
+ final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(),
+ identifier.getHistoryId(), shard);
+ return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId);
+ }
+
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
- return AbstractProxyTransaction.create(client, getHistoryForCookie(shard),
- transactionId.getTransactionId(), client.resolver().getFutureBackendInfo(shard));
+ final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+ return history.createTransactionProxy(transactionId);
}
/**
- * Callback invoked from {@link ClientTransaction} when a transaction has been sub
+ * Callback invoked from {@link ClientTransaction} when a transaction has been submitted.
*
* @param transaction Transaction handle
*/