package org.opendaylight.controller.cluster.databroker.actors.dds;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
* @author Robert Varga
*/
abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
- static enum State {
+ enum State {
IDLE,
TX_OPEN,
CLOSED,
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+ private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
- private final Map<Long, LocalHistoryIdentifier> histories = new ConcurrentHashMap<>();
+ @GuardedBy("this")
+ private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+ @GuardedBy("this")
+ private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
+
+ private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
private final DistributedDataStoreClientBehavior client;
private final LocalHistoryIdentifier identifier;
+ // Used via NEXT_TX_UPDATER
+ @SuppressWarnings("unused")
+ private volatile long nextTx = 0;
+
private volatile State state = State.IDLE;
AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
}
- private LocalHistoryIdentifier getHistoryForCookie(final Long cookie) {
- LocalHistoryIdentifier ret = histories.get(cookie);
- if (ret == null) {
- ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie);
- final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret);
- if (existing != null) {
- ret = existing;
- }
- }
-
- return ret;
- }
-
@Override
public final LocalHistoryIdentifier getIdentifier() {
return identifier;
return client;
}
+ final long nextTx() {
+ return NEXT_TX_UPDATER.getAndIncrement(this);
+ }
+
@Override
final void localAbort(final Throwable cause) {
- LOG.debug("Force-closing history {}", getIdentifier(), cause);
- state = State.CLOSED;
+ final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
+ if (oldState != State.CLOSED) {
+ LOG.debug("Force-closing history {}", getIdentifier(), cause);
+
+ synchronized (this) {
+ for (ClientTransaction t : openTransactions.values()) {
+ t.localAbort(cause);
+ }
+ openTransactions.clear();
+ readyTransactions.clear();
+ }
+ }
+ }
+
+ private AbstractProxyHistory createHistoryProxy(final Long shard) {
+ return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
+ identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
}
+ abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+ final Optional<ShardBackendInfo> backendInfo);
+
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
- return AbstractProxyTransaction.create(client, getHistoryForCookie(shard),
- transactionId.getTransactionId(), client.resolver().getFutureBackendInfo(shard));
+ final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+ return history.createTransactionProxy(transactionId);
+ }
+
+ public final ClientTransaction createTransaction() {
+ Preconditions.checkState(state != State.CLOSED);
+
+ synchronized (this) {
+ final ClientTransaction ret = doCreateTransaction();
+ openTransactions.put(ret.getIdentifier(), ret);
+ return ret;
+ }
}
+ @GuardedBy("this")
+ abstract ClientTransaction doCreateTransaction();
+
/**
- * Callback invoked from {@link ClientTransaction} when a transaction has been sub
+ * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
*
- * @param transaction Transaction handle
+ * @param txId Transaction identifier
+ * @param cohort Transaction commit cohort
*/
- void onTransactionReady(final ClientTransaction transaction) {
- client.transactionComplete(transaction);
+ synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+ final AbstractTransactionCommitCohort cohort) {
+ final ClientTransaction tx = openTransactions.remove(txId);
+ Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+
+ final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
+ Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
+ cohort, txId, previous);
+
+ return cohort;
+ }
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
+ * backend.
+ *
+ * @param txId transaction identifier
+ */
+ synchronized void onTransactionAbort(final TransactionIdentifier txId) {
+ if (openTransactions.remove(txId) == null) {
+ LOG.warn("Could not find aborting transaction {}", txId);
+ }
+ }
+
+ /**
+ * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
+ * and all its state can be removed.
+ *
+ * @param txId transaction identifier
+ */
+ synchronized void onTransactionComplete(final TransactionIdentifier txId) {
+ if (readyTransactions.remove(txId) == null) {
+ LOG.warn("Could not find completed transaction {}", txId);
+ }
}
}