BUG-5280: move transactions keeping to history
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
index b164157982691cdda8095168b7b6432859b2bb0e..ce2c164b562ade50271cf0f71835803cdb6e3591 100644 (file)
@@ -8,9 +8,13 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -31,13 +35,24 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+    private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
 
+    @GuardedBy("this")
+    private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+    @GuardedBy("this")
+    private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
+
     private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
     private final DistributedDataStoreClientBehavior client;
     private final LocalHistoryIdentifier identifier;
 
+    // Used via NEXT_TX_UPDATER
+    @SuppressWarnings("unused")
+    private volatile long nextTx = 0;
+
     private volatile State state = State.IDLE;
 
     AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
@@ -64,29 +79,91 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         return client;
     }
 
+    final long nextTx() {
+        return NEXT_TX_UPDATER.getAndIncrement(this);
+    }
+
     @Override
     final void localAbort(final Throwable cause) {
-        LOG.debug("Force-closing history {}", getIdentifier(), cause);
-        state = State.CLOSED;
+        final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
+        if (oldState != State.CLOSED) {
+            LOG.debug("Force-closing history {}", getIdentifier(), cause);
+
+            synchronized (this) {
+                for (ClientTransaction t : openTransactions.values()) {
+                    t.localAbort(cause);
+                }
+                openTransactions.clear();
+                readyTransactions.clear();
+            }
+        }
     }
 
     private AbstractProxyHistory createHistoryProxy(final Long shard) {
-        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(),
-            identifier.getHistoryId(), shard);
-        return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId);
+        return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
+            identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
     }
 
+    abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final Optional<ShardBackendInfo> backendInfo);
+
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
         final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
         return history.createTransactionProxy(transactionId);
     }
 
+    public final ClientTransaction createTransaction() {
+        Preconditions.checkState(state != State.CLOSED);
+
+        synchronized (this) {
+            final ClientTransaction ret = doCreateTransaction();
+            openTransactions.put(ret.getIdentifier(), ret);
+            return ret;
+        }
+    }
+
+    @GuardedBy("this")
+    abstract ClientTransaction doCreateTransaction();
+
+    /**
+     * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+     *
+     * @param txId Transaction identifier
+     * @param cohort Transaction commit cohort
+     */
+    synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+            final AbstractTransactionCommitCohort cohort) {
+        final ClientTransaction tx = openTransactions.remove(txId);
+        Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+
+        final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
+        Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
+                cohort, txId, previous);
+
+        return cohort;
+    }
+
+    /**
+     * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
+     * backend.
+     *
+     * @param txId transaction identifier
+     */
+    synchronized void onTransactionAbort(final TransactionIdentifier txId) {
+        if (openTransactions.remove(txId) == null) {
+            LOG.warn("Could not find aborting transaction {}", txId);
+        }
+    }
+
     /**
-     * Callback invoked from {@link ClientTransaction} when a transaction has been submitted.
+     * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
+     * and all its state can be removed.
      *
-     * @param transaction Transaction handle
+     * @param txId transaction identifier
      */
-    void onTransactionReady(final ClientTransaction transaction) {
-        client.transactionComplete(transaction);
+    synchronized void onTransactionComplete(final TransactionIdentifier txId) {
+        if (readyTransactions.remove(txId) == null) {
+            LOG.warn("Could not find completed transaction {}", txId);
+        }
     }
 }